< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
@@ -181,799 +181,12 @@
   * @author Doug Lea
   */
  public class ForkJoinPool extends AbstractExecutorService {
  
      /*
-      * Implementation Overview
-      *
-      * This class and its nested classes provide the main
-      * functionality and control for a set of worker threads.  Because
-      * most internal methods and nested classes are interrelated,
-      * their main rationale and descriptions are presented here;
-      * individual methods and nested classes contain only brief
-      * comments about details. Broadly: submissions from non-FJ
-      * threads enter into submission queues.  Workers take these tasks
-      * and typically split them into subtasks that may be stolen by
-      * other workers. Work-stealing based on randomized scans
-      * generally leads to better throughput than "work dealing" in
-      * which producers assign tasks to idle threads, in part because
-      * threads that have finished other tasks before the signalled
-      * thread wakes up (which can be a long time) can take the task
-      * instead.  Preference rules give first priority to processing
-      * tasks from their own queues (LIFO or FIFO, depending on mode),
-      * then to randomized FIFO steals of tasks in other queues.  This
-      * framework began as vehicle for supporting tree-structured
-      * parallelism using work-stealing.  Over time, its scalability
-      * advantages led to extensions and changes to better support more
-      * diverse usage contexts.  Here's a brief history of major
-      * revisions, each also with other minor features and changes.
-      *
-      * 1. Only handle recursively structured computational tasks
-      * 2. Async (FIFO) mode and striped submission queues
-      * 3. Completion-based tasks (mainly CountedCompleters)
-      * 4. CommonPool and parallelStream support
-      * 5. InterruptibleTasks for externally submitted tasks
-      *
-      * Most changes involve adaptions of base algorithms using
-      * combinations of static and dynamic bitwise mode settings (both
-      * here and in ForkJoinTask), and subclassing of ForkJoinTask.
-      * There are a fair number of odd code constructions and design
-      * decisions for components that reside at the edge of Java vs JVM
-      * functionality.
-      *
-      * WorkQueues
-      * ==========
-      *
-      * Most operations occur within work-stealing queues (in nested
-      * class WorkQueue).  These are special forms of Deques that
-      * support only three of the four possible end-operations -- push,
-      * pop, and poll (aka steal), under the further constraints that
-      * push and pop are called only from the owning thread (or, as
-      * extended here, under a lock), while poll may be called from
-      * other threads.  (If you are unfamiliar with them, you probably
-      * want to read Herlihy and Shavit's book "The Art of
-      * Multiprocessor programming", chapter 16 describing these in
-      * more detail before proceeding.)  The main work-stealing queue
-      * design is roughly similar to those in the papers "Dynamic
-      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
-      * (http://research.sun.com/scalable/pubs/index.html) and
-      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
-      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
-      * The main differences ultimately stem from GC requirements that
-      * we null out taken slots as soon as we can, to maintain as small
-      * a footprint as possible even in programs generating huge
-      * numbers of tasks. To accomplish this, we shift the CAS
-      * arbitrating pop vs poll (steal) from being on the indices
-      * ("base" and "top") to the slots themselves. These provide the
-      * primary required memory ordering -- see "Correct and Efficient
-      * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and
-      * Nardelli, PPoPP 2013
-      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
-      * analysis of memory ordering requirements in work-stealing
-      * algorithms similar to the one used here.  We use per-operation
-      * ordered writes of various kinds for updates, but usually use
-      * explicit load fences for reads, to cover access of several
-      * fields of possibly several objects without further constraining
-      * read-by-read ordering.
-      *
-      * We also support a user mode in which local task processing is
-      * in FIFO, not LIFO order, simply by using a local version of
-      * poll rather than pop.  This can be useful in message-passing
-      * frameworks in which tasks are never joined, although with
-      * increased contention among task producers and consumers. Also,
-      * the same data structure (and class) is used for "submission
-      * queues" (described below) holding externally submitted tasks,
-      * that differ only in that a lock (using field "phase"; see below) is
-      * required by external callers to push and pop tasks.
-      *
-      * Adding tasks then takes the form of a classic array push(task)
-      * in a circular buffer:
-      *    q.array[q.top++ % length] = task;
-      *
-      * The actual code needs to null-check and size-check the array,
-      * uses masking, not mod, for indexing a power-of-two-sized array,
-      * enforces memory ordering, supports resizing, and possibly
-      * signals waiting workers to start scanning (described below),
-      * which requires stronger forms of order accesses.
-      *
-      * The pop operation (always performed by owner) is of the form:
-      *   if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
-      *        decrement top and return task;
-      * If this fails, the queue is empty. This operation is one part
-      * of the nextLocalTask method, that instead does a local-poll
-      * in FIFO mode.
-      *
-      * The poll operation is, basically:
-      *   if (CAS nonnull task t = q.array[k = q.base % length] to null)
-      *       increment base and return task;
-      *
-      * However, there are several more cases that must be dealt with.
-      * Some of them are just due to asynchrony; others reflect
-      * contention and stealing policies. Stepping through them
-      * illustrates some of the implementation decisions in this class.
-      *
-      *  * Slot k must be read with an acquiring read, which it must
-      *    anyway to dereference and run the task if the (acquiring)
-      *    CAS succeeds, but uses an explicit acquire fence to support
-      *    the following rechecks even if the CAS is not attempted.  To
-      *    more easily distinguish among kinds of CAS failures, we use
-      *    the compareAndExchange version, and usually handle null
-      *    returns (indicating contention) separately from others.
-      *
-      *  * q.base may change between reading and using its value to
-      *    index the slot. To avoid trying to use the wrong t, the
-      *    index and slot must be reread (not necessarily immediately)
-      *    until consistent, unless this is a local poll by owner, in
-      *    which case this form of inconsistency can only appear as t
-      *    being null, below.
-      *
-      *  * Similarly, q.array may change (due to a resize), unless this
-      *    is a local poll by owner. Otherwise, when t is present, this
-      *    only needs consideration on CAS failure (since a CAS
-      *    confirms the non-resized case.)
-      *
-      *  * t may appear null because a previous poll operation has not
-      *    yet incremented q.base, so the read is from an already-taken
-      *    index. This form of stall reflects the non-lock-freedom of
-      *    the poll operation. Stalls can be detected by observing that
-      *    q.base doesn't change on repeated reads of null t and when
-      *    no other alternatives apply, spin-wait for it to settle.  To
-      *    reduce producing these kinds of stalls by other stealers, we
-      *    encourage timely writes to indices using otherwise
-      *    unnecessarily strong writes.
-      *
-      *  * The CAS may fail, in which case we may want to retry unless
-      *    there is too much contention. One goal is to balance and
-      *    spread out the many forms of contention that may be
-      *    encountered across polling and other operations to avoid
-      *    sustained performance degradations. Across all cases where
-      *    alternatives exist, a bounded number of CAS misses or stalls
-      *    are tolerated (for slots, ctl, and elsewhere described
-      *    below) before taking alternative action. These may move
-      *    contention or retries elsewhere, which is still preferable
-      *    to single-point bottlenecks.
-      *
-      *  * Even though the check "top == base" is quiescently accurate
-      *    to determine whether a queue is empty, it is not of much use
-      *    when deciding whether to try to poll or repoll after a
-      *    failure.  Both top and base may move independently, and both
-      *    lag updates to the underlying array. To reduce memory
-      *    contention, non-owners avoid reading the "top" when
-      *    possible, by using one-ahead reads to check whether to
-      *    repoll, relying on the fact that a non-empty queue does not
-      *    have two null slots in a row, except in cases (resizes and
-      *    shifts) that can be detected with a secondary recheck that
-      *    is less likely to conflict with owner writes.
-      *
-      * The poll operations in q.poll(), scan(), helpJoin(), and
-      * elsewhere differ with respect to whether other queues are
-      * available to try, and the presence or nature of screening steps
-      * when only some kinds of tasks can be taken. When alternatives
-      * (or failing) is an option, they uniformly give up after
-      * bounded numbers of stalls and/or CAS failures, which reduces
-      * contention when too many workers are polling too few tasks.
-      * Overall, in the aggregate, we ensure probabilistic
-      * non-blockingness of work-stealing at least until checking
-      * quiescence (which is intrinsically blocking): If an attempted
-      * steal fails in these ways, a scanning thief chooses a different
-      * target to try next. In contexts where alternatives aren't
-      * available, and when progress conditions can be isolated to
-      * values of a single variable, simple spinloops (using
-      * Thread.onSpinWait) are used to reduce memory traffic.
-      *
-      * WorkQueues are also used in a similar way for tasks submitted
-      * to the pool. We cannot mix these tasks in the same queues used
-      * by workers. Instead, we randomly associate submission queues
-      * with submitting threads, using a form of hashing.  The
-      * ThreadLocalRandom probe value serves as a hash code for
-      * choosing existing queues, and may be randomly repositioned upon
-      * contention with other submitters.  In essence, submitters act
-      * like workers except that they are restricted to executing local
-      * tasks that they submitted (or when known, subtasks thereof).
-      * Insertion of tasks in shared mode requires a lock. We use only
-      * a simple spinlock (as one role of field "phase") because
-      * submitters encountering a busy queue move to a different
-      * position to use or create other queues.  They (spin) block when
-      * registering new queues, or indirectly elsewhere, by revisiting
-      * later.
-      *
-      * Management
-      * ==========
-      *
-      * The main throughput advantages of work-stealing stem from
-      * decentralized control -- workers mostly take tasks from
-      * themselves or each other, at rates that can exceed a billion
-      * per second.  Most non-atomic control is performed by some form
-      * of scanning across or within queues.  The pool itself creates,
-      * activates (enables scanning for and running tasks),
-      * deactivates, blocks, and terminates threads, all with minimal
-      * central information.  There are only a few properties that we
-      * can globally track or maintain, so we pack them into a small
-      * number of variables, often maintaining atomicity without
-      * blocking or locking.  Nearly all essentially atomic control
-      * state is held in a few variables that are by far most often
-      * read (not written) as status and consistency checks. We pack as
-      * much information into them as we can.
-      *
-      * Field "ctl" contains 64 bits holding information needed to
-      * atomically decide to add, enqueue (on an event queue), and
-      * dequeue and release workers.  To enable this packing, we
-      * restrict maximum parallelism to (1<<15)-1 (which is far in
-      * excess of normal operating range) to allow ids, counts, and
-      * their negations (used for thresholding) to fit into 16bit
-      * subfields.
-      *
-      * Field "runState" and per-WorkQueue field "phase" play similar
-      * roles, as lockable, versioned counters. Field runState also
-      * includes monotonic event bits (SHUTDOWN, STOP, and TERMINATED).
-      * The version tags enable detection of state changes (by
-      * comparing two reads) modulo bit wraparound. The bit range in
-      * each case suffices for purposes of determining quiescence,
-      * termination, avoiding ABA-like errors, and signal control, most
-      * of which are ultimately based on at most 15bit ranges (due to
-      * 32767 max total workers). RunState updates do not need to be
-      * atomic with respect to ctl updates, but because they are not,
-      * some care is required to avoid stalls. The seqLock properties
-      * detect changes and conditionally upgrade to coordinate with
-      * updates. It is typically held for less than a dozen
-      * instructions unless the queue array is being resized, during
-      * which contention is rare. To be conservative, lockRunState is
-      * implemented as a spin/sleep loop. Here and elsewhere spin
-      * constants are short enough to apply even on systems with few
-      * available processors.  In addition to checking pool status,
-      * reads of runState sometimes serve as acquire fences before
-      * reading other fields.
-      *
-      * Field "parallelism" holds the target parallelism (normally
-      * corresponding to pool size). Users can dynamically reset target
-      * parallelism, but is only accessed when signalling or awaiting
-      * work, so only slowly has an effect in creating threads or
-      * letting them time out and terminate when idle.
-      *
-      * Array "queues" holds references to WorkQueues.  It is updated
-      * (only during worker creation and termination) under the
-      * runState lock. It is otherwise concurrently readable but reads
-      * for use in scans (see below) are always prefaced by a volatile
-      * read of runState (or equivalent constructions), ensuring that
-      * its state is current at the point it is used (which is all we
-      * require). To simplify index-based operations, the array size is
-      * always a power of two, and all readers must tolerate null
-      * slots.  Worker queues are at odd indices. Worker phase ids
-      * masked with SMASK match their index. Shared (submission) queues
-      * are at even indices. Grouping them together in this way aids in
-      * task scanning: At top-level, both kinds of queues should be
-      * sampled with approximately the same probability, which is
-      * simpler if they are all in the same array. But we also need to
-      * identify what kind they are without looking at them, leading to
-      * this odd/even scheme. One disadvantage is that there are
-      * usually many fewer submission queues, so there can be many
-      * wasted probes (null slots). But this is still cheaper than
-      * alternatives. Other loops over the queues array vary in origin
-      * and stride depending on whether they cover only submission
-      * (even) or worker (odd) queues or both, and whether they require
-      * randomness (in which case cyclically exhaustive strides may be
-      * used).
-      *
-      * All worker thread creation is on-demand, triggered by task
-      * submissions, replacement of terminated workers, and/or
-      * compensation for blocked workers. However, all other support
-      * code is set up to work with other policies.  To ensure that we
-      * do not hold on to worker or task references that would prevent
-      * GC, all accesses to workQueues in waiting, signalling, and
-      * control methods are via indices into the queues array (which is
-      * one source of some of the messy code constructions here). In
-      * essence, the queues array serves as a weak reference
-      * mechanism. In particular, the stack top subfield of ctl stores
-      * indices, not references. Operations on queues obtained from
-      * these indices remain valid (with at most some unnecessary extra
-      * work) even if an underlying worker failed and was replaced by
-      * another at the same index. During termination, worker queue
-      * array updates are disabled.
-      *
-      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
-      * cannot let workers spin indefinitely scanning for tasks when
-      * none can be found immediately, and we cannot start/resume
-      * workers unless there appear to be tasks available.  On the
-      * other hand, we must quickly prod them into action when new
-      * tasks are submitted or generated. These latencies are mainly a
-      * function of JVM park/unpark (and underlying OS) performance,
-      * which can be slow and variable (even though usages are
-      * streamlined as much as possible).  In many usages, ramp-up time
-      * is the main limiting factor in overall performance, which is
-      * compounded at program start-up by JIT compilation and
-      * allocation. On the other hand, throughput degrades when too
-      * many threads poll for too few tasks. (See below.)
-      *
-      * The "ctl" field atomically maintains total and "released"
-      * worker counts, plus the head of the available worker queue
-      * (actually stack, represented by the lower 32bit subfield of
-      * ctl).  Released workers are those known to be scanning for
-      * and/or running tasks (we cannot accurately determine
-      * which). Unreleased ("available") workers are recorded in the
-      * ctl stack. These workers are made eligible for signalling by
-      * enqueuing in ctl (see method runWorker).  This "queue" is a
-      * form of Treiber stack. This is ideal for activating threads in
-      * most-recently used order, and improves performance and
-      * locality, outweighing the disadvantages of being prone to
-      * contention and inability to release a worker unless it is
-      * topmost on stack. The top stack state holds the value of the
-      * "phase" field of the worker: its index and status, plus a
-      * version counter that, in addition to the count subfields (also
-      * serving as version stamps) provide protection against Treiber
-      * stack ABA effects.
-      *
-      * Creating workers. To create a worker, we pre-increment counts
-      * (serving as a reservation), and attempt to construct a
-      * ForkJoinWorkerThread via its factory. On starting, the new
-      * thread first invokes registerWorker, where it constructs a
-      * WorkQueue and is assigned an index in the queues array
-      * (expanding the array if necessary).  Upon any exception across
-      * these steps, or null return from factory, deregisterWorker
-      * adjusts counts and records accordingly.  If a null return, the
-      * pool continues running with fewer than the target number
-      * workers. If exceptional, the exception is propagated, generally
-      * to some external caller.
-      *
-      * WorkQueue field "phase" encodes the queue array id in lower
-      * bits, and otherwise acts similarly to the pool runState field:
-      * The "IDLE" bit is clear while active (either a released worker
-      * or a locked external queue), with other bits serving as a
-      * version counter to distinguish changes across multiple reads.
-      * Note that phase field updates lag queue CAS releases; seeing a
-      * non-idle phase does not guarantee that the worker is available
-      * (and so is never checked in this way).
-      *
-      * The ctl field also serves as the basis for memory
-      * synchronization surrounding activation. This uses a more
-      * efficient version of a Dekker-like rule that task producers and
-      * consumers sync with each other by both writing/CASing ctl (even
-      * if to its current value).  However, rather than CASing ctl to
-      * its current value in the common case where no action is
-      * required, we reduce write contention by ensuring that
-      * signalWork invocations are prefaced with a fully fenced memory
-      * access (which is usually needed anyway).
-      *
-      * Signalling. Signals (in signalWork) cause new or reactivated
-      * workers to scan for tasks.  Method signalWork and its callers
-      * try to approximate the unattainable goal of having the right
-      * number of workers activated for the tasks at hand, but must err
-      * on the side of too many workers vs too few to avoid stalls:
-      *
-      *  * If computations are purely tree structured, it suffices for
-      *    every worker to activate another when it pushes a task into
-      *    an empty queue, resulting in O(log(#threads)) steps to full
-      *    activation. Emptiness must be conservatively approximated,
-      *    sometimes resulting in unnecessary signals.  Also, to reduce
-      *    resource usages in some cases, at the expense of slower
-      *    startup in others, activation of an idle thread is preferred
-      *    over creating a new one, here and elsewhere.
-      *
-      *  * If instead, tasks come in serially from only a single
-      *    producer, each worker taking its first (since the last
-      *    activation) task from a queue should propagate a signal if
-      *    there are more tasks in that queue. This is equivalent to,
-      *    but generally faster than, arranging the stealer take
-      *    multiple tasks, re-pushing one or more on its own queue, and
-      *    signalling (because its queue is empty), also resulting in
-      *    logarithmic full activation time
-      *
-      * * Because we don't know about usage patterns (or most commonly,
-      *    mixtures), we use both approaches, which present even more
-      *    opportunities to over-signal.  Note that in either of these
-      *    contexts, signals may be (and often are) unnecessary because
-      *    active workers continue scanning after running tasks without
-      *    the need to be signalled (which is one reason work stealing
-      *    is often faster than alternatives), so additional workers
-      *    aren't needed. We filter out some of these cases by exiting
-      *    retry loops in signalWork if the task responsible for the
-      *    signal has already been taken.
-      *
-      * * For rapidly branching tasks that require full pool resources,
-      *   oversignalling is OK, because signalWork will soon have no
-      *   more workers to create or reactivate. But for others (mainly
-      *   externally submitted tasks), overprovisioning may cause very
-      *   noticeable slowdowns due to contention and resource
-      *   wastage. All remedies are intrinsically heuristic. We use a
-      *   strategy that works well in most cases: We track "sources"
-      *   (queue ids) of non-empty (usually polled) queues while
-      *   scanning. These are maintained in the "source" field of
-      *   WorkQueues for use in method helpJoin and elsewhere (see
-      *   below). We also maintain them as arguments/results of
-      *   top-level polls (argument "window" in method scan, with setup
-      *   in method runWorker) as an encoded sliding window of current
-      *   and previous two sources (or INVALID_ID if none), and stop
-      *   signalling when all were from the same source. These
-      *   mechanisms may result in transiently too few workers, but
-      *   once workers poll from a new source, they rapidly reactivate
-      *   others.
-      *
-      * * Despite these, signal contention and overhead effects still
-      *   occur during ramp-up and ramp-down of small computations.
-      *
-      * Scanning. Method scan performs top-level scanning for (and
-      * execution of) tasks by polling a pseudo-random permutation of
-      * the array (by starting at a given index, and using a constant
-      * cyclically exhaustive stride.)  It uses the same basic polling
-      * method as WorkQueue.poll(), but restarts with a different
-      * permutation on each invocation.  The pseudorandom generator
-      * need not have high-quality statistical properties in the long
-      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
-      * from ThreadLocalRandom probes, which are cheap and
-      * suffice. Scans do not otherwise explicitly take into account
-      * core affinities, loads, cache localities, etc, However, they do
-      * exploit temporal locality (which usually approximates these) by
-      * preferring to re-poll from the same queue (either in method
-      * tryPoll() or scan) after a successful poll before trying others
-      * (see method topLevelExec), which also reduces bookkeeping,
-      * cache traffic, and scanning overhead. But it also reduces
-      * fairness, which is partially counteracted by giving up on
-      * contention.
-      *
-      * Deactivation. When method scan indicates that no tasks are
-      * found by a worker, it tries to deactivate (in awaitWork),
-      * giving up (and rescanning) on ctl contention. To avoid missed
-      * signals during deactivation, the method rescans and reactivates
-      * if there may have been a missed signal during deactivation,
-      * filtering out most cases in which this is unnecessary. Because
-      * idle workers are often not yet blocked (parked), we use the
-      * WorkQueue parking field to advertise that a waiter actually
-      * needs unparking upon signal.
-      *
-      * Quiescence. Workers scan looking for work, giving up when they
-      * don't find any, without being sure that none are available.
-      * However, some required functionality relies on consensus about
-      * quiescence (also termination, discussed below). The count
-      * fields in ctl allow accurate discovery of states in which all
-      * workers are idle.  However, because external (asynchronous)
-      * submitters are not part of this vote, these mechanisms
-      * themselves do not guarantee that the pool is in a quiescent
-      * state with respect to methods isQuiescent, shutdown (which
-      * begins termination when quiescent), helpQuiesce, and indirectly
-      * others including tryCompensate. Method quiescent() is
-      * used in all of these contexts. It provides checks that all
-      * workers are idle and there are no submissions that they could
-      * poll if they were not idle, retrying on inconsistent reads of
-      * queues and using the runState seqLock to retry on queue array
-      * updates.  (It also reports quiescence if the pool is
-      * terminating.) A true report means only that there was a moment
-      * at which quiescence held.  False negatives are inevitable (for
-      * example when queues indices lag updates, as described above),
-      * which is accommodated when (tentatively) idle by scanning for
-      * work etc, and then re-invoking. This includes cases in which
-      * the final unparked thread (in awaitWork) uses quiescent()
-      * to check for tasks that could have been added during a race
-      * window that would not be accompanied by a signal, in which case
-      * re-activating itself (or any other worker) to rescan. Method
-      * helpQuiesce acts similarly but cannot rely on ctl counts to
-      * determine that all workers are inactive because the caller and
-      * any others executing helpQuiesce are not included in counts.
-      *
-      * Termination. A call to shutdownNow invokes tryTerminate to
-      * atomically set a runState mode bit.  However, the process of
-      * termination is intrinsically non-atomic. The calling thread, as
-      * well as other workers thereafter terminating help cancel queued
-      * tasks and interrupt other workers. These actions race with
-      * unterminated workers.  By default, workers check for
-      * termination only when accessing pool state.  This may take a
-      * while but suffices for structured computational tasks.  But not
-      * necessarily for others. Class InterruptibleTask (see below)
-      * further arranges runState checks before executing task bodies,
-      * and ensures interrupts while terminating. Even so, there are no
-      * guarantees after an abrupt shutdown that remaining tasks
-      * complete normally or exceptionally or are cancelled.
-      * Termination may fail to complete if running tasks ignore both
-      * task status and interrupts and/or produce more tasks after
-      * others that could cancel them have exited.
-      *
-      * Trimming workers. To release resources after periods of lack of
-      * use, a worker starting to wait when the pool is quiescent will
-      * time out and terminate if the pool has remained quiescent for
-      * period given by field keepAlive (default 60sec), which applies
-      * to the first timeout of a fully populated pool. Subsequent (or
-      * other) cases use delays such that, if still quiescent, all will
-      * be released before one additional keepAlive unit elapses.
-      *
-      * Joining Tasks
-      * =============
-      *
-      * The "Join" part of ForkJoinPools consists of a set of
-      * mechanisms that sometimes or always (depending on the kind of
-      * task) avoid context switching or adding worker threads when one
-      * task would otherwise be blocked waiting for completion of
-      * another, basically, just by running that task or one of its
-      * subtasks if not already taken. These mechanics are disabled for
-      * InterruptibleTasks, that guarantee that callers do not execute
-      * submitted tasks.
-      *
-      * The basic structure of joining is an extended spin/block scheme
-      * in which workers check for task completions status between
-      * steps to find other work, until relevant pool state stabilizes
-      * enough to believe that no such tasks are available, at which
-      * point blocking. This is usually a good choice of when to block
-      * that would otherwise be harder to approximate.
-      *
-      * These forms of helping may increase stack space usage, but that
-      * space is bounded in tree/dag structured procedurally parallel
-      * designs to be no more than that if a task were executed only by
-      * the joining thread. This is arranged by associated task
-      * subclasses that also help detect and control the ways in which
-      * this may occur.
-      *
-      * Normally, the first option when joining a task that is not done
-      * is to try to take it from the local queue and run it. Method
-      * tryRemoveAndExec tries to do so.  For tasks with any form of
-      * subtasks that must be completed first, we try to locate these
-      * subtasks and run them as well. This is easy when local, but
-      * when stolen, steal-backs are restricted to the same rules as
-      * stealing (polling), which requires additional bookkeeping and
-      * scanning. This cost is still very much worthwhile because of
-      * its impact on task scheduling and resource control.
-      *
-      * The two methods for finding and executing subtasks vary in
-      * details.  The algorithm in helpJoin entails a form of "linear
-      * helping".  Each worker records (in field "source") the index of
-      * the internal queue from which it last stole a task. (Note:
-      * because chains cannot include even-numbered external queues,
-      * they are ignored, and 0 is an OK default.) The scan in method
-      * helpJoin uses these markers to try to find a worker to help
-      * (i.e., steal back a task from and execute it) that could make
-      * progress toward completion of the actively joined task.  Thus,
-      * the joiner executes a task that would be on its own local deque
-      * if the to-be-joined task had not been stolen. This is a
-      * conservative variant of the approach described in Wagner &
-      * Calder "Leapfrogging: a portable technique for implementing
-      * efficient futures" SIGPLAN Notices, 1993
-      * (http://portal.acm.org/citation.cfm?id=155354). It differs
-      * mainly in that we only record queues, not full dependency
-      * links.  This requires a linear scan of the queues to locate
-      * stealers, but isolates cost to when it is needed, rather than
-      * adding to per-task overhead.  For CountedCompleters, the
-      * analogous method helpComplete doesn't need stealer-tracking,
-      * but requires a similar (but simpler) check of completion
-      * chains.
-      *
-      * In either case, searches can fail to locate stealers when
-      * stalls delay recording sources or issuing subtasks. We avoid
-      * some of these cases by using snapshotted values of ctl as a
-      * check that the numbers of workers are not changing, along with
-      * rescans to deal with contention and stalls.  But even when
-      * accurately identified, stealers might not ever produce a task
-      * that the joiner can in turn help with.
-      *
-      * Related method helpAsyncBlocker does not directly rely on
-      * subtask structure, but instead avoids or postpones blocking of
-      * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by
-      * executing other asyncs that can be processed in any order.
-      * This is currently invoked only in non-join-based blocking
-      * contexts from classes CompletableFuture and
-      * SubmissionPublisher, that could be further generalized.
-      *
-      * When any of the above fail to avoid blocking, we rely on
-      * "compensation" -- an indirect form of context switching that
-      * either activates an existing worker to take the place of the
-      * blocked one, or expands the number of workers.
-      *
-      * Compensation does not by default aim to keep exactly the target
-      * parallelism number of unblocked threads running at any given
-      * time. Some previous versions of this class employed immediate
-      * compensations for any blocked join. However, in practice, the
-      * vast majority of blockages are transient byproducts of GC and
-      * other JVM or OS activities that are made worse by replacement
-      * by causing longer-term oversubscription. These are inevitable
-      * without (unobtainably) perfect information about whether worker
-      * creation is actually necessary.  False alarms are common enough
-      * to negatively impact performance, so compensation is by default
-      * attempted only when it appears possible that the pool could
-      * stall due to lack of any unblocked workers.  However, we allow
-      * users to override defaults using the long form of the
-      * ForkJoinPool constructor. The compensation mechanism may also
-      * be bounded.  Bounds for the commonPool better enable JVMs to
-      * cope with programming errors and abuse before running out of
-      * resources to do so.
-      *
-      * The ManagedBlocker extension API can't use helping so relies
-      * only on compensation in method awaitBlocker. This API was
-      * designed to highlight the uncertainty of compensation decisions
-      * by requiring implementation of method isReleasable to abort
-      * compensation during attempts to obtain a stable snapshot. But
-      * users now rely upon the fact that if isReleasable always
-      * returns false, the API can be used to obtain precautionary
-      * compensation, which is sometimes the only reasonable option
-      * when running unknown code in tasks; which is now supported more
-      * simply (see method beginCompensatedBlock).
-      *
-      * Common Pool
-      * ===========
-      *
-      * The static common pool always exists after static
-      * initialization.  Since it (or any other created pool) need
-      * never be used, we minimize initial construction overhead and
-      * footprint to the setup of about a dozen fields, although with
-      * some System property parsing and security processing that takes
-      * far longer than the actual construction when SecurityManagers
-      * are used or properties are set. The common pool is
-      * distinguished by having a null workerNamePrefix (which is an
-      * odd convention, but avoids the need to decode status in factory
-      * classes).  It also has PRESET_SIZE config set if parallelism
-      * was configured by system property.
-      *
-      * When external threads use the common pool, they can perform
-      * subtask processing (see helpComplete and related methods) upon
-      * joins, unless they are submitted using ExecutorService
-      * submission methods, which implicitly disallow this.  This
-      * caller-helps policy makes it sensible to set common pool
-      * parallelism level to one (or more) less than the total number
-      * of available cores, or even zero for pure caller-runs. External
-      * threads waiting for joins first check the common pool for their
-      * task, which fails quickly if the caller did not fork to common
-      * pool.
-      *
-      * Guarantees for common pool parallelism zero are limited to
-      * tasks that are joined by their callers in a tree-structured
-      * fashion or use CountedCompleters (as is true for jdk
-      * parallelStreams). Support infiltrates several methods,
-      * including those that retry helping steps until we are sure that
-      * none apply if there are no workers.
-      *
-      * As a more appropriate default in managed environments, unless
-      * overridden by system properties, we use workers of subclass
-      * InnocuousForkJoinWorkerThread when there is a SecurityManager
-      * present. These workers have no permissions set, do not belong
-      * to any user-defined ThreadGroup, and clear all ThreadLocals
-      * after executing any top-level task.  The associated mechanics
-      * may be JVM-dependent and must access particular Thread class
-      * fields to achieve this effect.
-      *
-      * InterruptibleTasks
-      * ====================
-      *
-      * Regular ForkJoinTasks manage task cancellation (method cancel)
-      * independently from the interrupt status of threads running
-      * tasks.  Interrupts are issued internally only while
-      * terminating, to wake up workers and cancel queued tasks.  By
-      * default, interrupts are cleared only when necessary to ensure
-      * that calls to LockSupport.park do not loop indefinitely (park
-      * returns immediately if the current thread is interrupted).
-      *
-      * To comply with ExecutorService specs, we use subclasses of
-      * abstract class InterruptibleTask for tasks that require
-      * stronger interruption and cancellation guarantees.  External
-      * submitters never run these tasks, even if in the common pool.
-      * InterruptibleTasks include a "runner" field (implemented
-      * similarly to FutureTask) to support cancel(true).  Upon pool
-      * shutdown, runners are interrupted so they can cancel. Since
-      * external joining callers never run these tasks, they must await
-      * cancellation by others, which can occur along several different
-      * paths.
-      *
-      * Across these APIs, rules for reporting exceptions for tasks
-      * with results accessed via join() differ from those via get(),
-      * which differ from those invoked using pool submit methods by
-      * non-workers (which comply with Future.get() specs). Internal
-      * usages of ForkJoinTasks ignore interrupt status when executing
-      * or awaiting completion.  Otherwise, reporting task results or
-      * exceptions is preferred to throwing InterruptedExecptions,
-      * which are in turn preferred to timeouts. Similarly, completion
-      * status is preferred to reporting cancellation.  Cancellation is
-      * reported as an unchecked exception by join(), and by worker
-      * calls to get(), but is otherwise wrapped in a (checked)
-      * ExecutionException.
-      *
-      * Worker Threads cannot be VirtualThreads, as enforced by
-      * requiring ForkJoinWorkerThreads in factories.  There are
-      * several constructions relying on this.  However as of this
-      * writing, virtual thread bodies are by default run as some form
-      * of InterruptibleTask.
-      *
-      * Memory placement
-      * ================
-      *
-      * Performance is very sensitive to placement of instances of
-      * ForkJoinPool and WorkQueues and their queue arrays, as well as
-      * the placement of their fields. Caches misses and contention due
-      * to false-sharing have been observed to slow down some programs
-      * by more than a factor of four. Effects may vary across initial
-      * memory configuarations, applications, and different garbage
-      * collectors and GC settings, so there is no perfect solution.
-      * Too much isolation may generate more cache misses in common
-      * cases (because some fields snd slots are usually read at the
-      * same time). The @Contended annotation provides only rough
-      * control (for good reason). Similarly for relying on fields
-      * being placed in size-sorted declaration order.
-      *
-      * We isolate the ForkJoinPool.ctl field that otherwise causes the
-      * most false-sharing misses with respect to other fields. Also,
-      * ForkJoinPool fields are ordered such that fields less prone to
-      * contention effects are first, offsetting those that otherwise
-      * would be, while also reducing total footprint vs using
-      * multiple @Contended regions, which tends to slow down
-      * less-contended applications. For class WorkQueue, an
-      * embedded @Contended region segregates fields most heavily
-      * updated by owners from those most commonly read by stealers or
-      * other management.  Initial sizing and resizing of WorkQueue
-      * arrays is an even more delicate tradeoff because the best
-      * strategy systematically varies across garbage collectors. Small
-      * arrays are better for locality and reduce GC scan time, but
-      * large arrays reduce both direct false-sharing and indirect
-      * cases due to GC bookkeeping (cardmarks etc), and reduce the
-      * number of resizes, which are not especially fast because they
-      * require atomic transfers.  Currently, arrays are initialized to
-      * be fairly small.  (Maintenance note: any changes in fields,
-      * queues, or their uses, or JVM layout policies, must be
-      * accompanied by re-evaluation of these placement and sizing
-      * decisions.)
-      *
-      * Style notes
-      * ===========
-      *
-      * Memory ordering relies mainly on atomic operations (CAS,
-      * getAndSet, getAndAdd) along with moded accesses. These use
-      * jdk-internal Unsafe for atomics and special memory modes,
-      * rather than VarHandles, to avoid initialization dependencies in
-      * other jdk components that require early parallelism.  This can
-      * be awkward and ugly, but also reflects the need to control
-      * outcomes across the unusual cases that arise in very racy code
-      * with very few invariants. All atomic task slot updates use
-      * Unsafe operations requiring offset positions, not indices, as
-      * computed by method slotOffset. All fields are read into locals
-      * before use, and null-checked if they are references, even if
-      * they can never be null under current usages. Usually,
-      * computations (held in local variables) are defined as soon as
-      * logically enabled, sometimes to convince compilers that they
-      * may be performed despite memory ordering constraints.  Array
-      * accesses using masked indices include checks (that are always
-      * true) that the array length is non-zero to avoid compilers
-      * inserting more expensive traps.  This is usually done in a
-      * "C"-like style of listing declarations at the heads of methods
-      * or blocks, and using inline assignments on first encounter.
-      * Nearly all explicit checks lead to bypass/return, not exception
-      * throws, because they may legitimately arise during shutdown. A
-      * few unusual loop constructions encourage (with varying
-      * effectiveness) JVMs about where (not) to place safepoints.
-      *
-      * There is a lot of representation-level coupling among classes
-      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
-      * fields of WorkQueue maintain data structures managed by
-      * ForkJoinPool, so are directly accessed.  There is little point
-      * trying to reduce this, since any associated future changes in
-      * representations will need to be accompanied by algorithmic
-      * changes anyway. Several methods intrinsically sprawl because
-      * they must accumulate sets of consistent reads of fields held in
-      * local variables. Some others are artificially broken up to
-      * reduce producer/consumer imbalances due to dynamic compilation.
-      * There are also other coding oddities (including several
-      * unnecessary-looking hoisted null checks) that help some methods
-      * perform reasonably even when interpreted (not compiled).
-      *
-      * The order of declarations in this file is (with a few exceptions):
-      * (1) Static configuration constants
-      * (2) Static utility functions
-      * (3) Nested (static) classes
-      * (4) Fields, along with constants used when unpacking some of them
-      * (5) Internal control methods
-      * (6) Callbacks and other support for ForkJoinTask methods
-      * (7) Exported methods
-      * (8) Static block initializing statics in minimally dependent order
-      *
-      * Revision notes
-      * ==============
-      *
-      * The main sources of differences from previous version are:
-      *
-      * * New abstract class ForkJoinTask.InterruptibleTask ensures
-      *   handling of tasks submitted under the ExecutorService
-      *   API are consistent with specs.
-      * * Method quiescent() replaces previous quiescence-related
-      *   checks, relying on versioning and sequence locking instead
-      *   of ReentrantLock.
-      * * Termination processing now ensures that internal data
-      *   structures are maintained consistently enough while stopping
-      *   to interrupt all workers and cancel all tasks. It also uses a
-      *   CountDownLatch instead of a Condition for termination because
-      *   of lock change.
-      * * Many other changes to avoid performance regressions due
-      *   to the above.
+      * Implementation Overview -- omitted until stable
+      *
       */
  
      // static configuration constants
  
      /**

@@ -1027,17 +240,10 @@
      // {pool, workQueue} config bits
      static final int FIFO             = 1 << 0;   // fifo queue or access mode
      static final int CLEAR_TLS        = 1 << 1;   // set for Innocuous workers
      static final int PRESET_SIZE      = 1 << 2;   // size was set by property
  
-     // source history window packing used in scan() and runWorker()
-     static final long RESCAN          = 1L << 63; // must be negative
-     static final long HMASK           = ((((long)SMASK) << 32) |
-                                          (((long)SMASK) << 16)); // history bits
-     static final long NO_HISTORY      = ((((long)INVALID_ID) << 32) | // no 3rd
-                                          (((long)INVALID_ID) << 16)); // no 2nd
- 
      // others
      static final int DEREGISTERED     = 1 << 31;  // worker terminating
      static final int UNCOMPENSATE     = 1 << 16;  // tryCompensate return
      static final int IDLE             = 1 << 16;  // phase seqlock/version count
  

@@ -1240,12 +446,12 @@
              U.putIntVolatile(this, BASE, v);
          }
          final void updateTop(int v) {
              U.putIntOpaque(this, TOP, v);
          }
-         final void forgetSource() {
-             U.putIntOpaque(this, SOURCE, 0);
+         final void setSource(int v) {
+             U.getAndSetInt(this, SOURCE, v);
          }
          final void updateArray(ForkJoinTask<?>[] a) {
              U.getAndSetReference(this, ARRAY, a);
          }
          final void unlockPhase() {

@@ -1315,24 +521,24 @@
                      newArray = new ForkJoinTask<?>[newCap];
                  } catch (OutOfMemoryError ex) {
                  }
                  if (newArray != null) {               // else throw on next push
                      int newMask = newCap - 1;         // poll old, push to new
+                     p = newMask & s;
                      for (int k = s, j = cap; j > 0; --j, --k) {
                          ForkJoinTask<?> u;
                          if ((u = (ForkJoinTask<?>)U.getAndSetReference(
                                   a, slotOffset(k & m), null)) == null)
                              break;                    // lost to pollers
                          newArray[k & newMask] = u;
                      }
-                     updateArray(newArray);            // fully fenced
+                     updateArray(a = newArray);        // fully fenced
                  }
-                 a = null;                             // always signal
              }
              if (!internal)
                  unlockPhase();
-             if ((a == null || a[m & (s - 1)] == null) && pool != null)
+             if ((room == 0 || a[m & (s - 1)] == null) && pool != null)
                  pool.signalWork(a, p);
          }
  
          /**
           * Takes next task, if one exists, in order specified by mode,

@@ -1341,12 +547,12 @@
           */
          private ForkJoinTask<?> nextLocalTask(int fifo) {
              ForkJoinTask<?> t = null;
              ForkJoinTask<?>[] a = array;
              int b = base, p = top, cap;
-             if (a != null && (cap = a.length) > 0) {
-                 for (int m = cap - 1, s, nb; p - b > 0; ) {
+             if (p - b > 0 && a != null && (cap = a.length) > 0) {
+                 for (int m = cap - 1, s, nb;;) {
                      if (fifo == 0 || (nb = b + 1) == p) {
                          if ((t = (ForkJoinTask<?>)U.getAndSetReference(
                                   a, slotOffset(m & (s = p - 1)), null)) != null)
                              updateTop(s);       // else lost race for only task
                          break;

@@ -1358,10 +564,12 @@
                      }
                      while (b == (b = base)) {
                          U.loadFence();
                          Thread.onSpinWait();    // spin to reduce memory traffic
                      }
+                     if (p - b <= 0)
+                         break;
                  }
              }
              return t;
          }
  

@@ -1417,13 +625,12 @@
          }
  
          /**
           * Polls for a task. Used only by non-owners.
           *
-          * @param pool if nonnull, pool to signal if more tasks exist
           */
-         final ForkJoinTask<?> poll(ForkJoinPool pool) {
+         final ForkJoinTask<?> poll() {
              for (;;) {
                  ForkJoinTask<?>[] a = array;
                  int b = base, cap, k;
                  if (a == null || (cap = a.length) <= 0)
                      break;

@@ -1435,71 +642,50 @@
                      if (t == null)
                          o = a[k];
                      else if (t == (o = U.compareAndExchangeReference(
                                         a, slotOffset(k), t, null))) {
                          updateBase(nb);
-                         if (a[nk] != null && pool != null)
-                             pool.signalWork(a, nk); // propagate
                          return t;
                      }
                      if (o == null && a[nk] == null && array == a &&
                          (phase & (IDLE | 1)) != 0 && top - base <= 0)
                          break;                    // empty
                  }
              }
              return null;
          }
  
+         // specialized execution methods
+ 
          /**
-          * Tries to poll next task in FIFO order, failing without
-          * retries on contention or stalls. Used only by topLevelExec
-          * to repoll from the queue obtained from pool.scan.
+          * Runs the given task, as well as remaining local tasks, plus
+          * those from src queue that can be taken without interference.
           */
-         private ForkJoinTask<?> tryPoll() {
-             ForkJoinTask<?>[] a; int cap;
-             if ((a = array) != null && (cap = a.length) > 0) {
-                 for (int b = base, k;;) {  // loop only if inconsistent
-                     ForkJoinTask<?> t = a[k = b & (cap - 1)];
-                     U.loadFence();
-                     if (b == (b = base)) {
-                         Object o;
-                         if (t == null)
-                             o = a[k];
-                         else if (t == (o = U.compareAndExchangeReference(
-                                            a, slotOffset(k), t, null))) {
-                             updateBase(b + 1);
-                             return t;
-                         }
-                         if (o == null)
+         final void topLevelExec(ForkJoinTask<?> task, WorkQueue src,
+                                 int srcBase, int cfg) {
+             if (task != null && src != null) {
+                 int fifo = cfg & FIFO, nstolen = 1;
+                 for (;;) {
+                     task.doExec();
+                     if ((task = nextLocalTask(fifo)) == null) {
+                         int k, cap; ForkJoinTask<?>[] a;
+                         if (src.base != srcBase ||
+                             (a = src.array) == null || (cap = a.length) <= 0 ||
+                             (task = a[k = srcBase & (cap - 1)]) == null)
+                             break;
+                         U.loadFence();
+                         if (src.base != srcBase || !U.compareAndSetReference(
+                                 a, slotOffset(k), task, null))
                              break;
+                         src.updateBase(++srcBase);
+                         ++nstolen;
                      }
                  }
+                 nsteals += nstolen;
+                 if ((cfg & CLEAR_TLS) != 0)
+                     ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
              }
-             return null;
-         }
- 
-         // specialized execution methods
- 
-         /**
-          * Runs the given (stolen) task if nonnull, as well as
-          * remaining local tasks and/or others available from the
-          * given queue, if any.
-          */
-         final void topLevelExec(ForkJoinTask<?> task, WorkQueue src, int srcId) {
-             int cfg = config, fifo = cfg & FIFO, nstolen = nsteals + 1;
-             if ((srcId & 1) != 0) // don't record external sources
-                 source = srcId;
-             if ((cfg & CLEAR_TLS) != 0)
-                 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
-             while (task != null) {
-                 task.doExec();
-                 if ((task = nextLocalTask(fifo)) == null && src != null &&
-                     (task = src.tryPoll()) != null)
-                     ++nstolen;
-             }
-             nsteals = nstolen;
-             forgetSource();
          }
  
          /**
           * Deep form of tryUnpush: Traverses from top and removes and
           * runs task if present.

@@ -1874,43 +1060,43 @@
              if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
                  w.source = DEREGISTERED;
                  if (phase != 0) {         // else failed to start
                      replaceable = true;
                      if ((phase & IDLE) != 0)
-                         reactivate(w);    // pool stopped before released
+                         releaseAll();     // pool stopped before released
                  }
              }
          }
-         long c = ctl;
-         if (src != DEREGISTERED)          // decrement counts
+         if (src != DEREGISTERED) {        // decrement counts
+             long c = ctl;
              do {} while (c != (c = compareAndExchangeCtl(
                                     c, ((RC_MASK & (c - RC_UNIT)) |
                                         (TC_MASK & (c - TC_UNIT)) |
                                         (LMASK & c)))));
-         else if ((int)c != 0)
-             replaceable = true;           // signal below to cascade timeouts
-         if (w != null) {                  // cancel remaining tasks
-             for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
-                 try {
-                     t.cancel(false);
-                 } catch (Throwable ignore) {
+             if (w != null) {              // cancel remaining tasks
+                 for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
+                     try {
+                         t.cancel(false);
+                     } catch (Throwable ignore) {
+                     }
                  }
              }
          }
          if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
              WorkQueue[] qs; int n, i;     // remove index unless terminating
              long ns = w.nsteals & 0xffffffffL;
-             int stop = lockRunState() & STOP;
-             if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0 &&
-                 qs[i = phase & SMASK & (n - 1)] == w) {
+             if ((lockRunState() & STOP) != 0)
+                 replaceable = false;
+             else if ((qs = queues) != null && (n = qs.length) > 0 &&
+                      qs[i = phase & SMASK & (n - 1)] == w) {
                  qs[i] = null;
                  stealCount += ns;         // accumulate steals
              }
              unlockRunState();
+             if (replaceable)
+                 signalWork(null, 0);
          }
-         if ((runState & STOP) == 0 && replaceable)
-             signalWork(null, 0); // may replace unless trimmed or uninitialized
          if (ex != null)
              ForkJoinTask.rethrow(ex);
      }
  
      /**

@@ -1922,101 +1108,107 @@
       */
      final void signalWork(ForkJoinTask<?>[] a, int k) {
          int pc = parallelism;
          for (long c = ctl;;) {
              WorkQueue[] qs = queues;
-             long ac = (c + RC_UNIT) & RC_MASK, nc;
-             int sp = (int)c, i = sp & SMASK;
-             if (qs == null || qs.length <= i)
+             if (a != null && a.length > k && k >= 0 && a[k] == null)
                  break;
-             WorkQueue w = qs[i], v = null;
-             if (sp == 0) {
-                 if ((short)(c >>> TC_SHIFT) >= pc)
-                     break;
-                 nc = ((c + TC_UNIT) & TC_MASK);
+             boolean done = false;
+             WorkQueue v = null;
+             long nc = 0L, ac = (c + RC_UNIT) & RC_MASK;
+             int sp = (int)c, i = sp & SMASK;
+             if ((short)(c >>> RC_SHIFT) >= pc || qs == null || qs.length <= i)
+                 done = true;
+             else {
+                 WorkQueue w = qs[i];
+                 if (sp == 0) {
+                     if ((short)(c >>> TC_SHIFT) >= pc)
+                         done = true;
+                     else
+                         nc = ac | ((c + TC_UNIT) & TC_MASK);
+                 }
+                 else if ((v = w) == null)
+                     done = true;
+                 else
+                     nc = ac | (c & TC_MASK) | (v.stackPred & LMASK);
              }
-             else if ((short)(c >>> RC_SHIFT) >= pc || (v = w) == null)
-                 break;
-             else
-                 nc = (v.stackPred & LMASK) | (c & TC_MASK);
-             if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
-                 if (v == null)
-                     createWorker();
-                 else {
-                     v.phase = sp;
-                     if (v.parking != 0)
-                         U.unpark(v.owner);
+             if (c == (c = ctl)) {        // confirm
+                 if (done)
+                     break;
+                 else if (c == (c = compareAndExchangeCtl(c, nc))) {
+                     if (v == null)
+                         createWorker();
+                     else {
+                         v.phase = sp;
+                         if (v.parking != 0)
+                             U.unpark(v.owner);
+                     }
+                     break;
                  }
-                 break;
              }
-             if (a != null && k >= 0 && k < a.length && a[k] == null)
-                 break;
          }
      }
  
      /**
       * Reactivates the given worker, and possibly others if not top of
       * ctl stack. Called only during shutdown to ensure release on
       * termination.
       */
-     private void reactivate(WorkQueue w) {
+     private void releaseAll() {
          for (long c = ctl;;) {
              WorkQueue[] qs; WorkQueue v; int sp, i;
-             if ((qs = queues) == null || (sp = (int)c) == 0 ||
-                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null ||
-                 (v != w && w != null && (w.phase & IDLE) == 0))
+             if ((sp = (int)c) == 0 || (qs = queues) == null ||
+                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
                  break;
              if (c == (c = compareAndExchangeCtl(
                            c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
                                (v.stackPred & LMASK))))) {
                  v.phase = sp;
-                 if (v == w)
-                     break;
                  if (v.parking != 0)
                      U.unpark(v.owner);
              }
          }
      }
  
      /**
       * Internal version of isQuiescent and related functionality.
-      * @return true if terminating or all workers are inactive and
-      * submission queues are empty and unlocked; if so, setting STOP
-      * if shutdown is enabled
+      * @return positive if stopping, nonnegative if terminating or all
+      * workers are inactive and submission queues are empty and
+      * unlocked; if so, setting STOP if shutdown is enabled
       */
-     private boolean quiescent() {
+     private int quiescent() {
          outer: for (;;) {
              long phaseSum = 0L;
              boolean swept = false;
              for (int e, prevRunState = 0; ; prevRunState = e) {
                  long c = ctl;
                  if (((e = runState) & STOP) != 0)
-                     return true;                          // terminating
+                     return 1;                         // terminating
                  else if ((c & RC_MASK) > 0L)
-                     return false;                         // at least one active
+                     return -1;                        // at least one active
                  else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
                      long sum = c;
                      WorkQueue[] qs = queues; WorkQueue q;
                      int n = (qs == null) ? 0 : qs.length;
                      for (int i = 0; i < n; ++i) {         // scan queues
                          if ((q = qs[i]) != null) {
                              int p = q.phase, s = q.top, b = q.base;
                              sum += (p & 0xffffffffL) | ((long)b << 32);
                              if ((p & IDLE) == 0 || s - b > 0) {
                                  if ((i & 1) == 0 && compareAndSetCtl(c, c))
-                                     signalWork(null, 0);  // ensure live
-                                 return false;
+                                     signalWork(q.array, q.base);
+                                 return -1;
                              }
                          }
                      }
                      swept = (phaseSum == (phaseSum = sum));
                  }
                  else if ((e & SHUTDOWN) == 0)
-                     return true;
+                     return 0;
                  else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
-                     interruptAll();                       // confirmed
-                     return true;                          // enable termination
+                     releaseAll();                         // confirmed
+                     return 1;                             // enable termination
                  }
                  else
                      break;                                // restart
              }
          }

@@ -2028,146 +1220,165 @@
       *
       * @param w caller's WorkQueue (may be null on failed initialization)
       */
      final void runWorker(WorkQueue w) {
          if (w != null) {
-             int phase = w.phase, r = w.stackPred; // seed from registerWorker
-             long window = (long)((r >>> 16) & SMASK) | NO_HISTORY;
+             int cfg = w.config & (FIFO|CLEAR_TLS), r = w.stackPred;
+             long stat;
              do {
-                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
-             } while ((runState & STOP) == 0 &&
-                      (((window = scan(w, window, r)) < 0L ||
-                        ((phase = awaitWork(w, phase)) & IDLE) == 0)));
+                 r = (int)(stat = scan(w, r, cfg));
+             } while ((int)(stat >>> 32) == 0 ||
+                      (quiescent() <= 0 && awaitWork(w) == 0));
          }
      }
  
      /**
-      * Scans for and if found executes top-level tasks: Tries to poll
-      * each queue starting at initial index with random stride,
-      * returning next scan window and retry indicator.
+      * Scans for and if found executes top-level task
       *
       * @param w caller's WorkQueue
-      * @param window up to three queue indices
-      * @param r random seed
-      * @return the next window to use, with RESCAN set for rescan
+      * @param random seed
+      * @param cfg config bits
+      * @return retry status and seed for next use
       */
-     private long scan(WorkQueue w, long window, int r) {
-         WorkQueue[] qs = queues;
-         int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1;
-         long next = window & ~RESCAN;
-         outer: for (int i = (short)window, l = n; l > 0; --l, i += step) {
-             int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
-             if ((q = qs[j = i & SMASK & (n - 1)]) != null &&
-                 (a = q.array) != null && (cap = a.length) > 0) {
-                 for (int b = q.base;;) {
-                     int nb = b + 1, nk = nb & (cap - 1), k;
-                     ForkJoinTask<?> t = a[k = b & (cap - 1)];
-                     U.loadFence();                // re-read b and t
-                     if (b == (b = q.base)) {      // else inconsistent; retry
-                         Object o;
-                         if (t == null)
-                             o = a[k];
-                         else if (t == (o = U.compareAndExchangeReference(
-                                            a, slotOffset(k), t, null))) {
-                             q.updateBase(nb);
-                             next = RESCAN | ((window << 16) & HMASK) | j;
-                             if (window != next && a[nk] != null)
-                                 signalWork(a, nk); // limit propagation
-                             if (w != null)        // always true
-                                 w.topLevelExec(t, q, j);
-                             break outer;
-                         }
-                         if (o == null) {
-                             if (next >= 0L && a[nk] != null)
-                                 next |= RESCAN;
+     private long scan(WorkQueue w, int r, int cfg) {
+         int spinScans = 0;                        // to rescan after deactivate
+         while (w != null && (runState & STOP) == 0) {
+             WorkQueue[] qs = queues;
+             int n = (qs == null) ? 0 : qs.length;
+             long prevCtl = ctl;                   // for signal check
+             int phase = w.phase;                  // IDLE set when deactivated
+             r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // advance xorshift
+             int i = r, step = (r >>> 16) | 1;     // scan random permutation
+             int stalls = 0;                       // move and restart if stuck
+             for (int l = n; l > 0; --l, i += step) {
+                 int j; WorkQueue q;
+                 if ((q = qs[j = i & SMASK & (n - 1)]) != null) {
+                     for (;;) {
+                         int cap, b, k; long kp; ForkJoinTask<?>[] a;
+                         if ((a = q.array) == null || (cap = a.length) <= 0)
                              break;
+                         ForkJoinTask<?> t = a[k = (cap - 1) & (b = q.base)];
+                         U.loadFence();
+                         if (q.base != b)
+                             continue;
+                         int nb = b + 1, nk = nb & (cap - 1);
+                         if (U.getReference(a, kp = slotOffset(k)) != t)
+                             ;                     // screen CAS
+                         else if (t == null) {     // check if empty
+                             if (a[nk] == null &&
+                                 a[(nb + 1) & (cap - 1)] == null) {
+                                 if (q.top - b <= 0)
+                                     break;        // probe slots as filter
+                             }
+                             else if (++stalls > n)
+                                 return r & LMASK; // restart to randomly move
+                             else if (stalls != 1)
+                                 Thread.onSpinWait();
+                         }
+                         else if ((phase & IDLE) != 0) { // recheck or reactivate
+                             long sp = w.stackPred & LMASK, sc; int np;
+                             if (((phase = w.phase) & IDLE) != 0) {
+                                 if ((np = phase + 1) != (int)(sc = ctl))
+                                     break;        // ineligible
+                                 if (compareAndSetCtl(
+                                         sc, sp | ((sc + RC_UNIT) & UMASK)))
+                                     w.phase = np;
+                             }
+                             return r & LMASK;     // restart
+                         }
+                         else if (U.compareAndSetReference(a, kp, t, null)) {
+                             q.base = nb;
+                             w.setSource(j);       // fully fenced
+                             signalWork(a, nk);    // signal if a[nk] nonnull
+                             w.topLevelExec(t, q, nb, cfg);
+                             return r & LMASK;
+                         }
+                     }
+                 }
+             }
+             if (w.phase == phase) {
+                 int ac; long c;                   // avoid missed signals
+                 if (((ac = (short)((c = ctl) >>> RC_SHIFT)) <= 0 ||
+                      c == prevCtl || ac < (short)(prevCtl >>> RC_SHIFT))) {
+                     if ((phase & IDLE) == 0) {    // try to deactivate
+                         long ap = (phase + (IDLE << 1)) & LMASK;
+                         spinScans = 0;
+                         w.stackPred = (int)c;     // set ctl stack link
+                         w.phase = phase | IDLE;
+                         while (c != (c = compareAndExchangeCtl(
+                                          c, ap | ((c - RC_UNIT) & UMASK)))) {
+                             if (ac <= (ac = (short)(c >>> RC_SHIFT))) {
+                                 w.phase = phase;  // nondecreasing; back out
+                                 break;
+                             }
+                             w.stackPred = (int)c; // retry
                          }
                      }
+                     else if (ac <= 0 || (spinScans += ac) >= SPIN_WAITS)
+                         break;
                  }
              }
          }
-         return next;
+         return (1L << 32) | (r & LMASK);
      }
  
      /**
-      * Tries to inactivate, and if successful, awaits signal or termination.
+      * Awaits signal or termination.
       *
-      * @param w the worker (may be null if already terminated)
-      * @param phase current phase
-      * @return current phase, with IDLE set if worker should exit
+      * @param w the WorkQueue (may be null if already terminated)
+      * @return nonzero for exit
       */
-     private int awaitWork(WorkQueue w, int phase) {
-         boolean quiet;                           // true if possibly quiescent
-         int active = phase + (IDLE << 1), p = phase | IDLE, e;
-         if (w != null) {
-             w.phase = p;                         // deactivate
-             long np = active & LMASK, pc = ctl;  // try to enqueue
-             long qc = np | ((pc - RC_UNIT) & UMASK);
-             w.stackPred = (int)pc;               // set ctl stack link
-             if (pc != (pc = compareAndExchangeCtl(pc, qc))) {
-                 qc = np | ((pc - RC_UNIT) & UMASK);
-                 w.stackPred = (int)pc;           // retry once
-                 if (pc != (pc = compareAndExchangeCtl(pc, qc)))
-                     p = w.phase = phase;         // back out
+     private int awaitWork(WorkQueue w) {
+         int p = IDLE, phase;
+         if (w != null && (p = (phase = w.phase) & IDLE) != 0) {
+             int nextPhase = phase + IDLE;
+             long deadline = 0L, c;         // set if all idle and w is ctl top
+             if (((c = ctl) & RC_MASK) <= 0L && (int)c == nextPhase) {
+                 int np = parallelism, nt = (short)(c >>> TC_SHIFT);
+                 long delay = keepAlive;    // scale if not fully populated
+                 if (nt != (nt = Math.max(nt, np)) && nt > 0)
+                     delay = Math.max(TIMEOUT_SLOP, delay / nt);
+                 long d = delay + System.currentTimeMillis();
+                 deadline = (d == 0L) ? 1L : d;
              }
-             if (p != phase && ((e = runState) & STOP) == 0 &&
-                 (!(quiet = (qc & RC_MASK) <= 0L) || (e & SHUTDOWN) == 0 ||
-                  !(quiet = quiescent()) || (runState & STOP) == 0)) {
-                 long deadline = 0L;              // not terminating
-                 if (quiet) {                     // use timeout if trimmable
-                     int nt = (short)(qc >>> TC_SHIFT);
-                     long delay = keepAlive;      // scale if not at target
-                     if (nt != (nt = Math.max(nt, parallelism)) && nt > 0)
-                         delay = Math.max(TIMEOUT_SLOP, delay / nt);
-                     if ((deadline = delay + System.currentTimeMillis()) == 0L)
-                         deadline = 1L;           // avoid zero
-                 }
-                 boolean release = quiet;
-                 WorkQueue[] qs = queues;         // recheck queues
-                 int n = (qs == null) ? 0 : qs.length;
-                 for (int l = -n, j = active; l < n; ++l, ++j) {
-                     WorkQueue q; ForkJoinTask<?>[] a; int cap;
-                     if ((p = w.phase) == active) // interleave signal checks
-                         break;
-                     if ((q = qs[j & (n - 1)]) != null &&
-                         (a = q.array) != null && (cap = a.length) > 0 &&
-                         a[q.base & (cap - 1)] != null) {
-                         if (release && qc == ctl && compareAndSetCtl(qc, pc)) {
-                             p = w.phase = active;
-                             break;               // possible missed signal
-                         }
-                         release = true;          // track multiple or reencounter
-                     }
-                     Thread.onSpinWait();         // reduce memory traffic
-                 }
-                 if (p != active) {               // emulate LockSupport.park
-                     LockSupport.setCurrentBlocker(this);
-                     w.parking = 1;
-                     for (;;) {
-                         if ((runState & STOP) != 0 || (p = w.phase) == active)
-                             break;
-                         U.park(deadline != 0L, deadline);
-                         if ((p = w.phase) == active || (runState & STOP) != 0)
-                             break;
-                         Thread.interrupted();    // clear for next park
-                         if (deadline != 0L && TIMEOUT_SLOP >
-                             deadline - System.currentTimeMillis()) {
-                             long sp = w.stackPred & LMASK, c = ctl;
-                             long nc = sp | (UMASK & (c - TC_UNIT));
-                             if (((int)c & SMASK) == (active & SMASK) &&
-                                 compareAndSetCtl(c, nc)) {
-                                 w.source = DEREGISTERED;
-                                 w.phase = active;
-                                 break;           // trimmed on timeout
-                             }
-                             deadline = 0L;       // no longer trimmable
+             LockSupport.setCurrentBlocker(this);
+             w.parking = 1;                 // enable unpark
+             for (;;) {                     // emulate LockSupport.park
+                 if ((runState & STOP) != 0)
+                     break;
+                 if ((p = w.phase & IDLE) == 0)
+                     break;
+                 U.park(deadline != 0L, deadline);
+                 if ((p = w.phase & IDLE) == 0)
+                     break;
+                 if ((runState & STOP) != 0)
+                     break;
+                 Thread.interrupted();      // clear for next park
+                 if (deadline != 0L &&      // try to trim
+                     deadline - System.currentTimeMillis() < TIMEOUT_SLOP) {
+                     long sp = w.stackPred & LMASK, dc = ctl;
+                     long nc = sp | (UMASK & (dc - TC_UNIT));
+                     if ((int)dc == nextPhase && compareAndSetCtl(dc, nc)) {
+                         WorkQueue[] qs; WorkQueue v; int vp, i;
+                         w.source = DEREGISTERED;
+                         w.phase = nextPhase; // try to wake up next waiter
+                         if ((vp = (int)nc) != 0 && (qs = queues) != null &&
+                             qs.length > (i = vp & SMASK) &&
+                             (v = qs[i]) != null &&
+                             compareAndSetCtl(nc, ((UMASK & (nc + RC_UNIT)) |
+                                                   (nc & TC_MASK) |
+                                                   (v.stackPred & LMASK)))) {
+                             v.phase = vp;
+                             U.unpark(v.owner);
                          }
+                         break;
                      }
-                     w.parking = 0;
-                     LockSupport.setCurrentBlocker(null);
+                     deadline = 0L;         // no longer trimmable
                  }
              }
+             w.parking = 0;                 // disable unpark
+             LockSupport.setCurrentBlocker(null);
          }
          return p;
      }
  
      /**

@@ -2185,11 +1396,11 @@
                  r &= ~1;
              int step = (submissionsOnly) ? 2 : 1;
              if ((qs = queues) != null && (n = qs.length) > 0) {
                  for (int i = n; i > 0; i -= step, r += step) {
                      if ((q = qs[r & (n - 1)]) != null &&
-                         (t = q.poll(this)) != null)
+                         (t = q.poll()) != null)
                          return t;
                  }
              }
          }
          return null;

@@ -2521,22 +1732,22 @@
       * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
       * @param interruptible true if return on interrupt
       * @return positive if quiescent, negative if interrupted, else 0
       */
      private int externalHelpQuiesce(long nanos, boolean interruptible) {
-         if (!quiescent()) {
+         if (quiescent() < 0) {
              long startTime = System.nanoTime();
              long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
              for (int waits = 0;;) {
                  ForkJoinTask<?> t;
                  if (interruptible && Thread.interrupted())
                      return -1;
                  else if ((t = pollScan(false)) != null) {
                      waits = 0;
                      t.doExec();
                  }
-                 else if (quiescent())
+                 else if (quiescent() >= 0)
                      break;
                  else if (System.nanoTime() - startTime > nanos)
                      return 0;
                  else if (waits == 0)
                      waits = MIN_SLEEP;

@@ -2601,11 +1812,11 @@
              if ((qs = queues) == null)
                  break;
              if ((n = qs.length) <= 0)
                  break;
              if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
-                 WorkQueue w = new WorkQueue(null, id, 0, false);
+                 WorkQueue w = new WorkQueue(null, id, (int)config, false);
                  w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
                  int stop = lockRunState() & STOP;
                  if (stop == 0 && queues == qs && qs[i] == null)
                      q = qs[i] = w;                   // else discard; retry
                  unlockRunState();

@@ -2757,69 +1968,67 @@
       * if no work and no active workers
       * @param enable if true, terminate when next possible
       * @return runState on exit
       */
      private int tryTerminate(boolean now, boolean enable) {
-         int e = runState;
+         int e = runState, isShutdown;
          if ((e & STOP) == 0) {
              if (now) {
-                 int s = lockRunState();
-                 runState = e = (s + RS_LOCK) | STOP | SHUTDOWN;
-                 if ((s & STOP) == 0)
-                     interruptAll();
+                 runState = e = (lockRunState() + RS_LOCK) | STOP | SHUTDOWN;
+                 releaseAll();
              }
-             else {
-                 int isShutdown = (e & SHUTDOWN);
-                 if (isShutdown == 0 && enable)
-                     getAndBitwiseOrRunState(isShutdown = SHUTDOWN);
-                 if (isShutdown != 0)
-                     quiescent();                 // may trigger STOP
-                 e = runState;
+             else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) {
+                 if (isShutdown == 0)
+                     getAndBitwiseOrRunState(SHUTDOWN);
+                 if (quiescent() > 0)
+                     e = runState;
              }
          }
-         if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
-             int r = (int)Thread.currentThread().threadId(); // stagger traversals
-             WorkQueue[] qs = queues;
-             int n = (qs == null) ? 0 : qs.length;
-             for (int l = n; l > 0; --l, ++r) {
-                 int j = r & SMASK & (n - 1); WorkQueue q; ForkJoinTask<?> t;
-                 while ((q = qs[j]) != null && q.source != DEREGISTERED &&
-                        (t = q.poll(null)) != null) {
-                     try {
-                         t.cancel(false);
-                     } catch (Throwable ignore) {
-                     }
-                 }
+         if ((e & (STOP | TERMINATED)) == STOP) {
+             if ((ctl & RC_MASK) > 0L) {         // avoid if quiescent shutdown
+                 helpTerminate(now);
+                 e = runState;
              }
-             if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
+             if ((e & TERMINATED) == 0 && ctl == 0L) {
+                 e |= TERMINATED;
                  if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
                      CountDownLatch done; SharedThreadContainer ctr;
                      if ((done = termination) != null)
                          done.countDown();
                      if ((ctr = container) != null)
                          ctr.close();
                  }
-                 e = runState;
              }
          }
          return e;
      }
  
      /**
-      * Interrupts all workers
+      * Cancels tasks and interrupts workers
       */
-     private void interruptAll() {
+     private void helpTerminate(boolean now) {
          Thread current = Thread.currentThread();
+         int r = (int)current.threadId();   // stagger traversals
          WorkQueue[] qs = queues;
          int n = (qs == null) ? 0 : qs.length;
-         for (int i = 1; i < n; i += 2) {
-             WorkQueue q; Thread o;
-             if ((q = qs[i]) != null && (o = q.owner) != null && o != current &&
-                 q.source != DEREGISTERED) {
-                 try {
-                     o.interrupt();
-                 } catch (Throwable ignore) {
+         for (int l = n; l > 0; --l, ++r) {
+             WorkQueue q; ForkJoinTask<?> t; Thread o;
+             int j = r & SMASK & (n - 1);
+             if ((q = qs[j]) != null && q.source != DEREGISTERED) {
+                 while ((t = q.poll()) != null) {
+                     try {
+                         t.cancel(false);
+                     } catch (Throwable ignore) {
+                     }
+                 }
+                 if ((r & 1) != 0 && (o = q.owner) != null &&
+                     o != current && q.source != DEREGISTERED &&
+                     (now || !o.isInterrupted())) {
+                     try {
+                         o.interrupt();
+                     } catch (Throwable ignore) {
+                     }
                  }
              }
          }
      }
  

@@ -3504,11 +2713,11 @@
       * threads remain inactive.
       *
       * @return {@code true} if all threads are currently idle
       */
      public boolean isQuiescent() {
-         return quiescent();
+         return quiescent() >= 0;
      }
  
      /**
       * Returns an estimate of the total number of completed tasks that
       * were executed by a thread other than their submitter. The

@@ -3991,19 +3200,21 @@
      /**
       * Invokes tryCompensate to create or re-activate a spare thread to
       * compensate for a thread that performs a blocking operation. When the
       * blocking operation is done then endCompensatedBlock must be invoked
       * with the value returned by this method to re-adjust the parallelism.
+      * @return value to use in endCompensatedBlock
       */
      final long beginCompensatedBlock() {
          int c;
          do {} while ((c = tryCompensate(ctl)) < 0);
          return (c == 0) ? 0L : RC_UNIT;
      }
  
      /**
       * Re-adjusts parallelism after a blocking operation completes.
+      * @param post value from beginCompensatedBlock
       */
      void endCompensatedBlock(long post) {
          if (post > 0L) {
              getAndAddCtl(post);
          }
< prev index next >