< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.reflect.Field;
  40 import java.security.AccessController;
  41 import java.security.AccessControlContext;
  42 import java.security.Permission;
  43 import java.security.Permissions;
  44 import java.security.PrivilegedAction;
  45 import java.security.ProtectionDomain;
  46 import java.util.ArrayList;
  47 import java.util.Collection;
  48 import java.util.Collections;
  49 import java.util.List;
  50 import java.util.Objects;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.CountDownLatch;
  53 import java.util.concurrent.locks.LockSupport;
  54 import jdk.internal.access.JavaUtilConcurrentFJPAccess;
  55 import jdk.internal.access.SharedSecrets;
  56 import jdk.internal.misc.Unsafe;
  57 import jdk.internal.vm.SharedThreadContainer;

  58 
  59 /**
  60  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  61  * A {@code ForkJoinPool} provides the entry point for submissions
  62  * from non-{@code ForkJoinTask} clients, as well as management and
  63  * monitoring operations.
  64  *
  65  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  66  * ExecutorService} mainly by virtue of employing
  67  * <em>work-stealing</em>: all threads in the pool attempt to find and
  68  * execute tasks submitted to the pool and/or created by other active
  69  * tasks (eventually blocking waiting for work if none exist). This
  70  * enables efficient processing when most tasks spawn other subtasks
  71  * (as do most {@code ForkJoinTask}s), as well as when many small
  72  * tasks are submitted to the pool from external clients.  Especially
  73  * when setting <em>asyncMode</em> to true in constructors, {@code
  74  * ForkJoinPool}s may also be appropriate for use with event-style
  75  * tasks that are never joined. All worker threads are initialized
  76  * with {@link Thread#isDaemon} set {@code true}.
  77  *

 166  * Upon any error in establishing these settings, default parameters
 167  * are used. It is possible to disable or limit the use of threads in
 168  * the common pool by setting the parallelism property to zero, and/or
 169  * using a factory that may return {@code null}. However doing so may
 170  * cause unjoined tasks to never be executed.
 171  *
 172  * @implNote This implementation restricts the maximum number of
 173  * running threads to 32767. Attempts to create pools with greater
 174  * than the maximum number result in {@code
 175  * IllegalArgumentException}. Also, this implementation rejects
 176  * submitted tasks (that is, by throwing {@link
 177  * RejectedExecutionException}) only when the pool is shut down or
 178  * internal resources have been exhausted.
 179  *
 180  * @since 1.7
 181  * @author Doug Lea
 182  */
 183 public class ForkJoinPool extends AbstractExecutorService {
 184 
 185     /*
 186      * Implementation Overview
 187      *
 188      * This class and its nested classes provide the main
 189      * functionality and control for a set of worker threads.  Because
 190      * most internal methods and nested classes are interrelated,
 191      * their main rationale and descriptions are presented here;
 192      * individual methods and nested classes contain only brief
 193      * comments about details. Broadly: submissions from non-FJ
 194      * threads enter into submission queues.  Workers take these tasks
 195      * and typically split them into subtasks that may be stolen by
 196      * other workers. Work-stealing based on randomized scans
 197      * generally leads to better throughput than "work dealing" in
 198      * which producers assign tasks to idle threads, in part because
 199      * threads that have finished other tasks before the signalled
 200      * thread wakes up (which can be a long time) can take the task
 201      * instead.  Preference rules give first priority to processing
 202      * tasks from their own queues (LIFO or FIFO, depending on mode),
 203      * then to randomized FIFO steals of tasks in other queues.  This
 204      * framework began as vehicle for supporting tree-structured
 205      * parallelism using work-stealing.  Over time, its scalability
 206      * advantages led to extensions and changes to better support more
 207      * diverse usage contexts.  Here's a brief history of major
 208      * revisions, each also with other minor features and changes.
 209      *
 210      * 1. Only handle recursively structured computational tasks
 211      * 2. Async (FIFO) mode and striped submission queues
 212      * 3. Completion-based tasks (mainly CountedCompleters)
 213      * 4. CommonPool and parallelStream support
 214      * 5. InterruptibleTasks for externally submitted tasks
 215      *
 216      * Most changes involve adaptions of base algorithms using
 217      * combinations of static and dynamic bitwise mode settings (both
 218      * here and in ForkJoinTask), and subclassing of ForkJoinTask.
 219      * There are a fair number of odd code constructions and design
 220      * decisions for components that reside at the edge of Java vs JVM
 221      * functionality.
 222      *
 223      * WorkQueues
 224      * ==========
 225      *
 226      * Most operations occur within work-stealing queues (in nested
 227      * class WorkQueue).  These are special forms of Deques that
 228      * support only three of the four possible end-operations -- push,
 229      * pop, and poll (aka steal), under the further constraints that
 230      * push and pop are called only from the owning thread (or, as
 231      * extended here, under a lock), while poll may be called from
 232      * other threads.  (If you are unfamiliar with them, you probably
 233      * want to read Herlihy and Shavit's book "The Art of
 234      * Multiprocessor programming", chapter 16 describing these in
 235      * more detail before proceeding.)  The main work-stealing queue
 236      * design is roughly similar to those in the papers "Dynamic
 237      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
 238      * (http://research.sun.com/scalable/pubs/index.html) and
 239      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
 240      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
 241      * The main differences ultimately stem from GC requirements that
 242      * we null out taken slots as soon as we can, to maintain as small
 243      * a footprint as possible even in programs generating huge
 244      * numbers of tasks. To accomplish this, we shift the CAS
 245      * arbitrating pop vs poll (steal) from being on the indices
 246      * ("base" and "top") to the slots themselves. These provide the
 247      * primary required memory ordering -- see "Correct and Efficient
 248      * Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and
 249      * Nardelli, PPoPP 2013
 250      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
 251      * analysis of memory ordering requirements in work-stealing
 252      * algorithms similar to the one used here.  We use per-operation
 253      * ordered writes of various kinds for updates, but usually use
 254      * explicit load fences for reads, to cover access of several
 255      * fields of possibly several objects without further constraining
 256      * read-by-read ordering.
 257      *
 258      * We also support a user mode in which local task processing is
 259      * in FIFO, not LIFO order, simply by using a local version of
 260      * poll rather than pop.  This can be useful in message-passing
 261      * frameworks in which tasks are never joined, although with
 262      * increased contention among task producers and consumers. Also,
 263      * the same data structure (and class) is used for "submission
 264      * queues" (described below) holding externally submitted tasks,
 265      * that differ only in that a lock (using field "phase"; see below) is
 266      * required by external callers to push and pop tasks.
 267      *
 268      * Adding tasks then takes the form of a classic array push(task)
 269      * in a circular buffer:
 270      *    q.array[q.top++ % length] = task;
 271      *
 272      * The actual code needs to null-check and size-check the array,
 273      * uses masking, not mod, for indexing a power-of-two-sized array,
 274      * enforces memory ordering, supports resizing, and possibly
 275      * signals waiting workers to start scanning (described below),
 276      * which requires stronger forms of order accesses.
 277      *
 278      * The pop operation (always performed by owner) is of the form:
 279      *   if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
 280      *        decrement top and return task;
 281      * If this fails, the queue is empty. This operation is one part
 282      * of the nextLocalTask method, that instead does a local-poll
 283      * in FIFO mode.
 284      *
 285      * The poll operation is, basically:
 286      *   if (CAS nonnull task t = q.array[k = q.base % length] to null)
 287      *       increment base and return task;
 288      *
 289      * However, there are several more cases that must be dealt with.
 290      * Some of them are just due to asynchrony; others reflect
 291      * contention and stealing policies. Stepping through them
 292      * illustrates some of the implementation decisions in this class.
 293      *
 294      *  * Slot k must be read with an acquiring read, which it must
 295      *    anyway to dereference and run the task if the (acquiring)
 296      *    CAS succeeds, but uses an explicit acquire fence to support
 297      *    the following rechecks even if the CAS is not attempted.  To
 298      *    more easily distinguish among kinds of CAS failures, we use
 299      *    the compareAndExchange version, and usually handle null
 300      *    returns (indicating contention) separately from others.
 301      *
 302      *  * q.base may change between reading and using its value to
 303      *    index the slot. To avoid trying to use the wrong t, the
 304      *    index and slot must be reread (not necessarily immediately)
 305      *    until consistent, unless this is a local poll by owner, in
 306      *    which case this form of inconsistency can only appear as t
 307      *    being null, below.
 308      *
 309      *  * Similarly, q.array may change (due to a resize), unless this
 310      *    is a local poll by owner. Otherwise, when t is present, this
 311      *    only needs consideration on CAS failure (since a CAS
 312      *    confirms the non-resized case.)
 313      *
 314      *  * t may appear null because a previous poll operation has not
 315      *    yet incremented q.base, so the read is from an already-taken
 316      *    index. This form of stall reflects the non-lock-freedom of
 317      *    the poll operation. Stalls can be detected by observing that
 318      *    q.base doesn't change on repeated reads of null t and when
 319      *    no other alternatives apply, spin-wait for it to settle.  To
 320      *    reduce producing these kinds of stalls by other stealers, we
 321      *    encourage timely writes to indices using otherwise
 322      *    unnecessarily strong writes.
 323      *
 324      *  * The CAS may fail, in which case we may want to retry unless
 325      *    there is too much contention. One goal is to balance and
 326      *    spread out the many forms of contention that may be
 327      *    encountered across polling and other operations to avoid
 328      *    sustained performance degradations. Across all cases where
 329      *    alternatives exist, a bounded number of CAS misses or stalls
 330      *    are tolerated (for slots, ctl, and elsewhere described
 331      *    below) before taking alternative action. These may move
 332      *    contention or retries elsewhere, which is still preferable
 333      *    to single-point bottlenecks.
 334      *
 335      *  * Even though the check "top == base" is quiescently accurate
 336      *    to determine whether a queue is empty, it is not of much use
 337      *    when deciding whether to try to poll or repoll after a
 338      *    failure.  Both top and base may move independently, and both
 339      *    lag updates to the underlying array. To reduce memory
 340      *    contention, non-owners avoid reading the "top" when
 341      *    possible, by using one-ahead reads to check whether to
 342      *    repoll, relying on the fact that a non-empty queue does not
 343      *    have two null slots in a row, except in cases (resizes and
 344      *    shifts) that can be detected with a secondary recheck that
 345      *    is less likely to conflict with owner writes.
 346      *
 347      * The poll operations in q.poll(), scan(), helpJoin(), and
 348      * elsewhere differ with respect to whether other queues are
 349      * available to try, and the presence or nature of screening steps
 350      * when only some kinds of tasks can be taken. When alternatives
 351      * (or failing) is an option, they uniformly give up after
 352      * bounded numbers of stalls and/or CAS failures, which reduces
 353      * contention when too many workers are polling too few tasks.
 354      * Overall, in the aggregate, we ensure probabilistic
 355      * non-blockingness of work-stealing at least until checking
 356      * quiescence (which is intrinsically blocking): If an attempted
 357      * steal fails in these ways, a scanning thief chooses a different
 358      * target to try next. In contexts where alternatives aren't
 359      * available, and when progress conditions can be isolated to
 360      * values of a single variable, simple spinloops (using
 361      * Thread.onSpinWait) are used to reduce memory traffic.
 362      *
 363      * WorkQueues are also used in a similar way for tasks submitted
 364      * to the pool. We cannot mix these tasks in the same queues used
 365      * by workers. Instead, we randomly associate submission queues
 366      * with submitting threads, using a form of hashing.  The
 367      * ThreadLocalRandom probe value serves as a hash code for
 368      * choosing existing queues, and may be randomly repositioned upon
 369      * contention with other submitters.  In essence, submitters act
 370      * like workers except that they are restricted to executing local
 371      * tasks that they submitted (or when known, subtasks thereof).
 372      * Insertion of tasks in shared mode requires a lock. We use only
 373      * a simple spinlock (as one role of field "phase") because
 374      * submitters encountering a busy queue move to a different
 375      * position to use or create other queues.  They (spin) block when
 376      * registering new queues, or indirectly elsewhere, by revisiting
 377      * later.
 378      *
 379      * Management
 380      * ==========
 381      *
 382      * The main throughput advantages of work-stealing stem from
 383      * decentralized control -- workers mostly take tasks from
 384      * themselves or each other, at rates that can exceed a billion
 385      * per second.  Most non-atomic control is performed by some form
 386      * of scanning across or within queues.  The pool itself creates,
 387      * activates (enables scanning for and running tasks),
 388      * deactivates, blocks, and terminates threads, all with minimal
 389      * central information.  There are only a few properties that we
 390      * can globally track or maintain, so we pack them into a small
 391      * number of variables, often maintaining atomicity without
 392      * blocking or locking.  Nearly all essentially atomic control
 393      * state is held in a few variables that are by far most often
 394      * read (not written) as status and consistency checks. We pack as
 395      * much information into them as we can.
 396      *
 397      * Field "ctl" contains 64 bits holding information needed to
 398      * atomically decide to add, enqueue (on an event queue), and
 399      * dequeue and release workers.  To enable this packing, we
 400      * restrict maximum parallelism to (1<<15)-1 (which is far in
 401      * excess of normal operating range) to allow ids, counts, and
 402      * their negations (used for thresholding) to fit into 16bit
 403      * subfields.
 404      *
 405      * Field "runState" and per-WorkQueue field "phase" play similar
 406      * roles, as lockable, versioned counters. Field runState also
 407      * includes monotonic event bits (SHUTDOWN, STOP, and TERMINATED).
 408      * The version tags enable detection of state changes (by
 409      * comparing two reads) modulo bit wraparound. The bit range in
 410      * each case suffices for purposes of determining quiescence,
 411      * termination, avoiding ABA-like errors, and signal control, most
 412      * of which are ultimately based on at most 15bit ranges (due to
 413      * 32767 max total workers). RunState updates do not need to be
 414      * atomic with respect to ctl updates, but because they are not,
 415      * some care is required to avoid stalls. The seqLock properties
 416      * detect changes and conditionally upgrade to coordinate with
 417      * updates. It is typically held for less than a dozen
 418      * instructions unless the queue array is being resized, during
 419      * which contention is rare. To be conservative, lockRunState is
 420      * implemented as a spin/sleep loop. Here and elsewhere spin
 421      * constants are short enough to apply even on systems with few
 422      * available processors.  In addition to checking pool status,
 423      * reads of runState sometimes serve as acquire fences before
 424      * reading other fields.
 425      *
 426      * Field "parallelism" holds the target parallelism (normally
 427      * corresponding to pool size). Users can dynamically reset target
 428      * parallelism, but is only accessed when signalling or awaiting
 429      * work, so only slowly has an effect in creating threads or
 430      * letting them time out and terminate when idle.
 431      *
 432      * Array "queues" holds references to WorkQueues.  It is updated
 433      * (only during worker creation and termination) under the
 434      * runState lock. It is otherwise concurrently readable but reads
 435      * for use in scans (see below) are always prefaced by a volatile
 436      * read of runState (or equivalent constructions), ensuring that
 437      * its state is current at the point it is used (which is all we
 438      * require). To simplify index-based operations, the array size is
 439      * always a power of two, and all readers must tolerate null
 440      * slots.  Worker queues are at odd indices. Worker phase ids
 441      * masked with SMASK match their index. Shared (submission) queues
 442      * are at even indices. Grouping them together in this way aids in
 443      * task scanning: At top-level, both kinds of queues should be
 444      * sampled with approximately the same probability, which is
 445      * simpler if they are all in the same array. But we also need to
 446      * identify what kind they are without looking at them, leading to
 447      * this odd/even scheme. One disadvantage is that there are
 448      * usually many fewer submission queues, so there can be many
 449      * wasted probes (null slots). But this is still cheaper than
 450      * alternatives. Other loops over the queues array vary in origin
 451      * and stride depending on whether they cover only submission
 452      * (even) or worker (odd) queues or both, and whether they require
 453      * randomness (in which case cyclically exhaustive strides may be
 454      * used).
 455      *
 456      * All worker thread creation is on-demand, triggered by task
 457      * submissions, replacement of terminated workers, and/or
 458      * compensation for blocked workers. However, all other support
 459      * code is set up to work with other policies.  To ensure that we
 460      * do not hold on to worker or task references that would prevent
 461      * GC, all accesses to workQueues in waiting, signalling, and
 462      * control methods are via indices into the queues array (which is
 463      * one source of some of the messy code constructions here). In
 464      * essence, the queues array serves as a weak reference
 465      * mechanism. In particular, the stack top subfield of ctl stores
 466      * indices, not references. Operations on queues obtained from
 467      * these indices remain valid (with at most some unnecessary extra
 468      * work) even if an underlying worker failed and was replaced by
 469      * another at the same index. During termination, worker queue
 470      * array updates are disabled.
 471      *
 472      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
 473      * cannot let workers spin indefinitely scanning for tasks when
 474      * none can be found immediately, and we cannot start/resume
 475      * workers unless there appear to be tasks available.  On the
 476      * other hand, we must quickly prod them into action when new
 477      * tasks are submitted or generated. These latencies are mainly a
 478      * function of JVM park/unpark (and underlying OS) performance,
 479      * which can be slow and variable (even though usages are
 480      * streamlined as much as possible).  In many usages, ramp-up time
 481      * is the main limiting factor in overall performance, which is
 482      * compounded at program start-up by JIT compilation and
 483      * allocation. On the other hand, throughput degrades when too
 484      * many threads poll for too few tasks. (See below.)
 485      *
 486      * The "ctl" field atomically maintains total and "released"
 487      * worker counts, plus the head of the available worker queue
 488      * (actually stack, represented by the lower 32bit subfield of
 489      * ctl).  Released workers are those known to be scanning for
 490      * and/or running tasks (we cannot accurately determine
 491      * which). Unreleased ("available") workers are recorded in the
 492      * ctl stack. These workers are made eligible for signalling by
 493      * enqueuing in ctl (see method runWorker).  This "queue" is a
 494      * form of Treiber stack. This is ideal for activating threads in
 495      * most-recently used order, and improves performance and
 496      * locality, outweighing the disadvantages of being prone to
 497      * contention and inability to release a worker unless it is
 498      * topmost on stack. The top stack state holds the value of the
 499      * "phase" field of the worker: its index and status, plus a
 500      * version counter that, in addition to the count subfields (also
 501      * serving as version stamps) provide protection against Treiber
 502      * stack ABA effects.
 503      *
 504      * Creating workers. To create a worker, we pre-increment counts
 505      * (serving as a reservation), and attempt to construct a
 506      * ForkJoinWorkerThread via its factory. On starting, the new
 507      * thread first invokes registerWorker, where it constructs a
 508      * WorkQueue and is assigned an index in the queues array
 509      * (expanding the array if necessary).  Upon any exception across
 510      * these steps, or null return from factory, deregisterWorker
 511      * adjusts counts and records accordingly.  If a null return, the
 512      * pool continues running with fewer than the target number
 513      * workers. If exceptional, the exception is propagated, generally
 514      * to some external caller.
 515      *
 516      * WorkQueue field "phase" encodes the queue array id in lower
 517      * bits, and otherwise acts similarly to the pool runState field:
 518      * The "IDLE" bit is clear while active (either a released worker
 519      * or a locked external queue), with other bits serving as a
 520      * version counter to distinguish changes across multiple reads.
 521      * Note that phase field updates lag queue CAS releases; seeing a
 522      * non-idle phase does not guarantee that the worker is available
 523      * (and so is never checked in this way).
 524      *
 525      * The ctl field also serves as the basis for memory
 526      * synchronization surrounding activation. This uses a more
 527      * efficient version of a Dekker-like rule that task producers and
 528      * consumers sync with each other by both writing/CASing ctl (even
 529      * if to its current value).  However, rather than CASing ctl to
 530      * its current value in the common case where no action is
 531      * required, we reduce write contention by ensuring that
 532      * signalWork invocations are prefaced with a fully fenced memory
 533      * access (which is usually needed anyway).
 534      *
 535      * Signalling. Signals (in signalWork) cause new or reactivated
 536      * workers to scan for tasks.  Method signalWork and its callers
 537      * try to approximate the unattainable goal of having the right
 538      * number of workers activated for the tasks at hand, but must err
 539      * on the side of too many workers vs too few to avoid stalls:
 540      *
 541      *  * If computations are purely tree structured, it suffices for
 542      *    every worker to activate another when it pushes a task into
 543      *    an empty queue, resulting in O(log(#threads)) steps to full
 544      *    activation. Emptiness must be conservatively approximated,
 545      *    sometimes resulting in unnecessary signals.  Also, to reduce
 546      *    resource usages in some cases, at the expense of slower
 547      *    startup in others, activation of an idle thread is preferred
 548      *    over creating a new one, here and elsewhere.
 549      *
 550      *  * If instead, tasks come in serially from only a single
 551      *    producer, each worker taking its first (since the last
 552      *    activation) task from a queue should propagate a signal if
 553      *    there are more tasks in that queue. This is equivalent to,
 554      *    but generally faster than, arranging the stealer take
 555      *    multiple tasks, re-pushing one or more on its own queue, and
 556      *    signalling (because its queue is empty), also resulting in
 557      *    logarithmic full activation time
 558      *
 559      * * Because we don't know about usage patterns (or most commonly,
 560      *    mixtures), we use both approaches, which present even more
 561      *    opportunities to over-signal.  Note that in either of these
 562      *    contexts, signals may be (and often are) unnecessary because
 563      *    active workers continue scanning after running tasks without
 564      *    the need to be signalled (which is one reason work stealing
 565      *    is often faster than alternatives), so additional workers
 566      *    aren't needed. We filter out some of these cases by exiting
 567      *    retry loops in signalWork if the task responsible for the
 568      *    signal has already been taken.
 569      *
 570      * * For rapidly branching tasks that require full pool resources,
 571      *   oversignalling is OK, because signalWork will soon have no
 572      *   more workers to create or reactivate. But for others (mainly
 573      *   externally submitted tasks), overprovisioning may cause very
 574      *   noticeable slowdowns due to contention and resource
 575      *   wastage. All remedies are intrinsically heuristic. We use a
 576      *   strategy that works well in most cases: We track "sources"
 577      *   (queue ids) of non-empty (usually polled) queues while
 578      *   scanning. These are maintained in the "source" field of
 579      *   WorkQueues for use in method helpJoin and elsewhere (see
 580      *   below). We also maintain them as arguments/results of
 581      *   top-level polls (argument "window" in method scan, with setup
 582      *   in method runWorker) as an encoded sliding window of current
 583      *   and previous two sources (or INVALID_ID if none), and stop
 584      *   signalling when all were from the same source. These
 585      *   mechanisms may result in transiently too few workers, but
 586      *   once workers poll from a new source, they rapidly reactivate
 587      *   others.
 588      *
 589      * * Despite these, signal contention and overhead effects still
 590      *   occur during ramp-up and ramp-down of small computations.
 591      *
 592      * Scanning. Method scan performs top-level scanning for (and
 593      * execution of) tasks by polling a pseudo-random permutation of
 594      * the array (by starting at a given index, and using a constant
 595      * cyclically exhaustive stride.)  It uses the same basic polling
 596      * method as WorkQueue.poll(), but restarts with a different
 597      * permutation on each invocation.  The pseudorandom generator
 598      * need not have high-quality statistical properties in the long
 599      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 600      * from ThreadLocalRandom probes, which are cheap and
 601      * suffice. Scans do not otherwise explicitly take into account
 602      * core affinities, loads, cache localities, etc, However, they do
 603      * exploit temporal locality (which usually approximates these) by
 604      * preferring to re-poll from the same queue (either in method
 605      * tryPoll() or scan) after a successful poll before trying others
 606      * (see method topLevelExec), which also reduces bookkeeping,
 607      * cache traffic, and scanning overhead. But it also reduces
 608      * fairness, which is partially counteracted by giving up on
 609      * contention.
 610      *
 611      * Deactivation. When method scan indicates that no tasks are
 612      * found by a worker, it tries to deactivate (in awaitWork),
 613      * giving up (and rescanning) on ctl contention. To avoid missed
 614      * signals during deactivation, the method rescans and reactivates
 615      * if there may have been a missed signal during deactivation,
 616      * filtering out most cases in which this is unnecessary. Because
 617      * idle workers are often not yet blocked (parked), we use the
 618      * WorkQueue parking field to advertise that a waiter actually
 619      * needs unparking upon signal.
 620      *
 621      * Quiescence. Workers scan looking for work, giving up when they
 622      * don't find any, without being sure that none are available.
 623      * However, some required functionality relies on consensus about
 624      * quiescence (also termination, discussed below). The count
 625      * fields in ctl allow accurate discovery of states in which all
 626      * workers are idle.  However, because external (asynchronous)
 627      * submitters are not part of this vote, these mechanisms
 628      * themselves do not guarantee that the pool is in a quiescent
 629      * state with respect to methods isQuiescent, shutdown (which
 630      * begins termination when quiescent), helpQuiesce, and indirectly
 631      * others including tryCompensate. Method quiescent() is
 632      * used in all of these contexts. It provides checks that all
 633      * workers are idle and there are no submissions that they could
 634      * poll if they were not idle, retrying on inconsistent reads of
 635      * queues and using the runState seqLock to retry on queue array
 636      * updates.  (It also reports quiescence if the pool is
 637      * terminating.) A true report means only that there was a moment
 638      * at which quiescence held.  False negatives are inevitable (for
 639      * example when queues indices lag updates, as described above),
 640      * which is accommodated when (tentatively) idle by scanning for
 641      * work etc, and then re-invoking. This includes cases in which
 642      * the final unparked thread (in awaitWork) uses quiescent()
 643      * to check for tasks that could have been added during a race
 644      * window that would not be accompanied by a signal, in which case
 645      * re-activating itself (or any other worker) to rescan. Method
 646      * helpQuiesce acts similarly but cannot rely on ctl counts to
 647      * determine that all workers are inactive because the caller and
 648      * any others executing helpQuiesce are not included in counts.
 649      *
 650      * Termination. A call to shutdownNow invokes tryTerminate to
 651      * atomically set a runState mode bit.  However, the process of
 652      * termination is intrinsically non-atomic. The calling thread, as
 653      * well as other workers thereafter terminating help cancel queued
 654      * tasks and interrupt other workers. These actions race with
 655      * unterminated workers.  By default, workers check for
 656      * termination only when accessing pool state.  This may take a
 657      * while but suffices for structured computational tasks.  But not
 658      * necessarily for others. Class InterruptibleTask (see below)
 659      * further arranges runState checks before executing task bodies,
 660      * and ensures interrupts while terminating. Even so, there are no
 661      * guarantees after an abrupt shutdown that remaining tasks
 662      * complete normally or exceptionally or are cancelled.
 663      * Termination may fail to complete if running tasks ignore both
 664      * task status and interrupts and/or produce more tasks after
 665      * others that could cancel them have exited.
 666      *
 667      * Trimming workers. To release resources after periods of lack of
 668      * use, a worker starting to wait when the pool is quiescent will
 669      * time out and terminate if the pool has remained quiescent for
 670      * period given by field keepAlive (default 60sec), which applies
 671      * to the first timeout of a fully populated pool. Subsequent (or
 672      * other) cases use delays such that, if still quiescent, all will
 673      * be released before one additional keepAlive unit elapses.
 674      *
 675      * Joining Tasks
 676      * =============
 677      *
 678      * The "Join" part of ForkJoinPools consists of a set of
 679      * mechanisms that sometimes or always (depending on the kind of
 680      * task) avoid context switching or adding worker threads when one
 681      * task would otherwise be blocked waiting for completion of
 682      * another, basically, just by running that task or one of its
 683      * subtasks if not already taken. These mechanics are disabled for
 684      * InterruptibleTasks, that guarantee that callers do not execute
 685      * submitted tasks.
 686      *
 687      * The basic structure of joining is an extended spin/block scheme
 688      * in which workers check for task completions status between
 689      * steps to find other work, until relevant pool state stabilizes
 690      * enough to believe that no such tasks are available, at which
 691      * point blocking. This is usually a good choice of when to block
 692      * that would otherwise be harder to approximate.
 693      *
 694      * These forms of helping may increase stack space usage, but that
 695      * space is bounded in tree/dag structured procedurally parallel
 696      * designs to be no more than that if a task were executed only by
 697      * the joining thread. This is arranged by associated task
 698      * subclasses that also help detect and control the ways in which
 699      * this may occur.
 700      *
 701      * Normally, the first option when joining a task that is not done
 702      * is to try to take it from the local queue and run it. Method
 703      * tryRemoveAndExec tries to do so.  For tasks with any form of
 704      * subtasks that must be completed first, we try to locate these
 705      * subtasks and run them as well. This is easy when local, but
 706      * when stolen, steal-backs are restricted to the same rules as
 707      * stealing (polling), which requires additional bookkeeping and
 708      * scanning. This cost is still very much worthwhile because of
 709      * its impact on task scheduling and resource control.
 710      *
 711      * The two methods for finding and executing subtasks vary in
 712      * details.  The algorithm in helpJoin entails a form of "linear
 713      * helping".  Each worker records (in field "source") the index of
 714      * the internal queue from which it last stole a task. (Note:
 715      * because chains cannot include even-numbered external queues,
 716      * they are ignored, and 0 is an OK default.) The scan in method
 717      * helpJoin uses these markers to try to find a worker to help
 718      * (i.e., steal back a task from and execute it) that could make
 719      * progress toward completion of the actively joined task.  Thus,
 720      * the joiner executes a task that would be on its own local deque
 721      * if the to-be-joined task had not been stolen. This is a
 722      * conservative variant of the approach described in Wagner &
 723      * Calder "Leapfrogging: a portable technique for implementing
 724      * efficient futures" SIGPLAN Notices, 1993
 725      * (http://portal.acm.org/citation.cfm?id=155354). It differs
 726      * mainly in that we only record queues, not full dependency
 727      * links.  This requires a linear scan of the queues to locate
 728      * stealers, but isolates cost to when it is needed, rather than
 729      * adding to per-task overhead.  For CountedCompleters, the
 730      * analogous method helpComplete doesn't need stealer-tracking,
 731      * but requires a similar (but simpler) check of completion
 732      * chains.
 733      *
 734      * In either case, searches can fail to locate stealers when
 735      * stalls delay recording sources or issuing subtasks. We avoid
 736      * some of these cases by using snapshotted values of ctl as a
 737      * check that the numbers of workers are not changing, along with
 738      * rescans to deal with contention and stalls.  But even when
 739      * accurately identified, stealers might not ever produce a task
 740      * that the joiner can in turn help with.
 741      *
 742      * Related method helpAsyncBlocker does not directly rely on
 743      * subtask structure, but instead avoids or postpones blocking of
 744      * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by
 745      * executing other asyncs that can be processed in any order.
 746      * This is currently invoked only in non-join-based blocking
 747      * contexts from classes CompletableFuture and
 748      * SubmissionPublisher, that could be further generalized.
 749      *
 750      * When any of the above fail to avoid blocking, we rely on
 751      * "compensation" -- an indirect form of context switching that
 752      * either activates an existing worker to take the place of the
 753      * blocked one, or expands the number of workers.
 754      *
 755      * Compensation does not by default aim to keep exactly the target
 756      * parallelism number of unblocked threads running at any given
 757      * time. Some previous versions of this class employed immediate
 758      * compensations for any blocked join. However, in practice, the
 759      * vast majority of blockages are transient byproducts of GC and
 760      * other JVM or OS activities that are made worse by replacement
 761      * by causing longer-term oversubscription. These are inevitable
 762      * without (unobtainably) perfect information about whether worker
 763      * creation is actually necessary.  False alarms are common enough
 764      * to negatively impact performance, so compensation is by default
 765      * attempted only when it appears possible that the pool could
 766      * stall due to lack of any unblocked workers.  However, we allow
 767      * users to override defaults using the long form of the
 768      * ForkJoinPool constructor. The compensation mechanism may also
 769      * be bounded.  Bounds for the commonPool better enable JVMs to
 770      * cope with programming errors and abuse before running out of
 771      * resources to do so.
 772      *
 773      * The ManagedBlocker extension API can't use helping so relies
 774      * only on compensation in method awaitBlocker. This API was
 775      * designed to highlight the uncertainty of compensation decisions
 776      * by requiring implementation of method isReleasable to abort
 777      * compensation during attempts to obtain a stable snapshot. But
 778      * users now rely upon the fact that if isReleasable always
 779      * returns false, the API can be used to obtain precautionary
 780      * compensation, which is sometimes the only reasonable option
 781      * when running unknown code in tasks; which is now supported more
 782      * simply (see method beginCompensatedBlock).
 783      *
 784      * Common Pool
 785      * ===========
 786      *
 787      * The static common pool always exists after static
 788      * initialization.  Since it (or any other created pool) need
 789      * never be used, we minimize initial construction overhead and
 790      * footprint to the setup of about a dozen fields, although with
 791      * some System property parsing and security processing that takes
 792      * far longer than the actual construction when SecurityManagers
 793      * are used or properties are set. The common pool is
 794      * distinguished by having a null workerNamePrefix (which is an
 795      * odd convention, but avoids the need to decode status in factory
 796      * classes).  It also has PRESET_SIZE config set if parallelism
 797      * was configured by system property.
 798      *
 799      * When external threads use the common pool, they can perform
 800      * subtask processing (see helpComplete and related methods) upon
 801      * joins, unless they are submitted using ExecutorService
 802      * submission methods, which implicitly disallow this.  This
 803      * caller-helps policy makes it sensible to set common pool
 804      * parallelism level to one (or more) less than the total number
 805      * of available cores, or even zero for pure caller-runs. External
 806      * threads waiting for joins first check the common pool for their
 807      * task, which fails quickly if the caller did not fork to common
 808      * pool.
 809      *
 810      * Guarantees for common pool parallelism zero are limited to
 811      * tasks that are joined by their callers in a tree-structured
 812      * fashion or use CountedCompleters (as is true for jdk
 813      * parallelStreams). Support infiltrates several methods,
 814      * including those that retry helping steps until we are sure that
 815      * none apply if there are no workers.
 816      *
 817      * As a more appropriate default in managed environments, unless
 818      * overridden by system properties, we use workers of subclass
 819      * InnocuousForkJoinWorkerThread when there is a SecurityManager
 820      * present. These workers have no permissions set, do not belong
 821      * to any user-defined ThreadGroup, and clear all ThreadLocals
 822      * after executing any top-level task.  The associated mechanics
 823      * may be JVM-dependent and must access particular Thread class
 824      * fields to achieve this effect.
 825      *
 826      * InterruptibleTasks
 827      * ====================
 828      *
 829      * Regular ForkJoinTasks manage task cancellation (method cancel)
 830      * independently from the interrupt status of threads running
 831      * tasks.  Interrupts are issued internally only while
 832      * terminating, to wake up workers and cancel queued tasks.  By
 833      * default, interrupts are cleared only when necessary to ensure
 834      * that calls to LockSupport.park do not loop indefinitely (park
 835      * returns immediately if the current thread is interrupted).
 836      *
 837      * To comply with ExecutorService specs, we use subclasses of
 838      * abstract class InterruptibleTask for tasks that require
 839      * stronger interruption and cancellation guarantees.  External
 840      * submitters never run these tasks, even if in the common pool.
 841      * InterruptibleTasks include a "runner" field (implemented
 842      * similarly to FutureTask) to support cancel(true).  Upon pool
 843      * shutdown, runners are interrupted so they can cancel. Since
 844      * external joining callers never run these tasks, they must await
 845      * cancellation by others, which can occur along several different
 846      * paths.
 847      *
 848      * Across these APIs, rules for reporting exceptions for tasks
 849      * with results accessed via join() differ from those via get(),
 850      * which differ from those invoked using pool submit methods by
 851      * non-workers (which comply with Future.get() specs). Internal
 852      * usages of ForkJoinTasks ignore interrupt status when executing
 853      * or awaiting completion.  Otherwise, reporting task results or
 854      * exceptions is preferred to throwing InterruptedExecptions,
 855      * which are in turn preferred to timeouts. Similarly, completion
 856      * status is preferred to reporting cancellation.  Cancellation is
 857      * reported as an unchecked exception by join(), and by worker
 858      * calls to get(), but is otherwise wrapped in a (checked)
 859      * ExecutionException.
 860      *
 861      * Worker Threads cannot be VirtualThreads, as enforced by
 862      * requiring ForkJoinWorkerThreads in factories.  There are
 863      * several constructions relying on this.  However as of this
 864      * writing, virtual thread bodies are by default run as some form
 865      * of InterruptibleTask.
 866      *
 867      * Memory placement
 868      * ================
 869      *
 870      * Performance is very sensitive to placement of instances of
 871      * ForkJoinPool and WorkQueues and their queue arrays, as well as
 872      * the placement of their fields. Caches misses and contention due
 873      * to false-sharing have been observed to slow down some programs
 874      * by more than a factor of four. Effects may vary across initial
 875      * memory configuarations, applications, and different garbage
 876      * collectors and GC settings, so there is no perfect solution.
 877      * Too much isolation may generate more cache misses in common
 878      * cases (because some fields snd slots are usually read at the
 879      * same time). The @Contended annotation provides only rough
 880      * control (for good reason). Similarly for relying on fields
 881      * being placed in size-sorted declaration order.
 882      *
 883      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 884      * most false-sharing misses with respect to other fields. Also,
 885      * ForkJoinPool fields are ordered such that fields less prone to
 886      * contention effects are first, offsetting those that otherwise
 887      * would be, while also reducing total footprint vs using
 888      * multiple @Contended regions, which tends to slow down
 889      * less-contended applications. For class WorkQueue, an
 890      * embedded @Contended region segregates fields most heavily
 891      * updated by owners from those most commonly read by stealers or
 892      * other management.  Initial sizing and resizing of WorkQueue
 893      * arrays is an even more delicate tradeoff because the best
 894      * strategy systematically varies across garbage collectors. Small
 895      * arrays are better for locality and reduce GC scan time, but
 896      * large arrays reduce both direct false-sharing and indirect
 897      * cases due to GC bookkeeping (cardmarks etc), and reduce the
 898      * number of resizes, which are not especially fast because they
 899      * require atomic transfers.  Currently, arrays are initialized to
 900      * be fairly small.  (Maintenance note: any changes in fields,
 901      * queues, or their uses, or JVM layout policies, must be
 902      * accompanied by re-evaluation of these placement and sizing
 903      * decisions.)
 904      *
 905      * Style notes
 906      * ===========
 907      *
 908      * Memory ordering relies mainly on atomic operations (CAS,
 909      * getAndSet, getAndAdd) along with moded accesses. These use
 910      * jdk-internal Unsafe for atomics and special memory modes,
 911      * rather than VarHandles, to avoid initialization dependencies in
 912      * other jdk components that require early parallelism.  This can
 913      * be awkward and ugly, but also reflects the need to control
 914      * outcomes across the unusual cases that arise in very racy code
 915      * with very few invariants. All atomic task slot updates use
 916      * Unsafe operations requiring offset positions, not indices, as
 917      * computed by method slotOffset. All fields are read into locals
 918      * before use, and null-checked if they are references, even if
 919      * they can never be null under current usages. Usually,
 920      * computations (held in local variables) are defined as soon as
 921      * logically enabled, sometimes to convince compilers that they
 922      * may be performed despite memory ordering constraints.  Array
 923      * accesses using masked indices include checks (that are always
 924      * true) that the array length is non-zero to avoid compilers
 925      * inserting more expensive traps.  This is usually done in a
 926      * "C"-like style of listing declarations at the heads of methods
 927      * or blocks, and using inline assignments on first encounter.
 928      * Nearly all explicit checks lead to bypass/return, not exception
 929      * throws, because they may legitimately arise during shutdown. A
 930      * few unusual loop constructions encourage (with varying
 931      * effectiveness) JVMs about where (not) to place safepoints.
 932      *
 933      * There is a lot of representation-level coupling among classes
 934      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
 935      * fields of WorkQueue maintain data structures managed by
 936      * ForkJoinPool, so are directly accessed.  There is little point
 937      * trying to reduce this, since any associated future changes in
 938      * representations will need to be accompanied by algorithmic
 939      * changes anyway. Several methods intrinsically sprawl because
 940      * they must accumulate sets of consistent reads of fields held in
 941      * local variables. Some others are artificially broken up to
 942      * reduce producer/consumer imbalances due to dynamic compilation.
 943      * There are also other coding oddities (including several
 944      * unnecessary-looking hoisted null checks) that help some methods
 945      * perform reasonably even when interpreted (not compiled).
 946      *
 947      * The order of declarations in this file is (with a few exceptions):
 948      * (1) Static configuration constants
 949      * (2) Static utility functions
 950      * (3) Nested (static) classes
 951      * (4) Fields, along with constants used when unpacking some of them
 952      * (5) Internal control methods
 953      * (6) Callbacks and other support for ForkJoinTask methods
 954      * (7) Exported methods
 955      * (8) Static block initializing statics in minimally dependent order
 956      *
 957      * Revision notes
 958      * ==============
 959      *
 960      * The main sources of differences from previous version are:
 961      *
 962      * * New abstract class ForkJoinTask.InterruptibleTask ensures
 963      *   handling of tasks submitted under the ExecutorService
 964      *   API are consistent with specs.
 965      * * Method quiescent() replaces previous quiescence-related
 966      *   checks, relying on versioning and sequence locking instead
 967      *   of ReentrantLock.
 968      * * Termination processing now ensures that internal data
 969      *   structures are maintained consistently enough while stopping
 970      *   to interrupt all workers and cancel all tasks. It also uses a
 971      *   CountDownLatch instead of a Condition for termination because
 972      *   of lock change.
 973      * * Many other changes to avoid performance regressions due
 974      *   to the above.
 975      */
 976 
 977     // static configuration constants
 978 
 979     /**
 980      * Default idle timeout value (in milliseconds) for idle threads
 981      * to park waiting for new work before terminating.
 982      */
 983     static final long DEFAULT_KEEPALIVE = 60_000L;
 984 
 985     /**
 986      * Undershoot tolerance for idle timeouts
 987      */
 988     static final long TIMEOUT_SLOP = 20L;
 989 
 990     /**
 991      * The default value for common pool maxSpares.  Overridable using
 992      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
 993      * system property.  The default value is far in excess of normal
 994      * requirements, but also far short of maximum capacity and typical OS

1012     static final int MAX_CAP          = 0x7fff;   // max # workers
1013     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1014     static final int INVALID_ID       = 0x4000;   // unused external queue id
1015 
1016     // pool.runState bits
1017     static final int STOP             = 1 <<  0;   // terminating
1018     static final int SHUTDOWN         = 1 <<  1;   // terminate when quiescent
1019     static final int TERMINATED       = 1 <<  2;   // only set if STOP also set
1020     static final int RS_LOCK          = 1 <<  3;   // lowest seqlock bit
1021 
1022     // spin/sleep limits for runState locking and elsewhere
1023     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1024     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos
1025     static final int MAX_SLEEP        = 1 << 20;   // approx 1 sec  as nanos
1026 
1027     // {pool, workQueue} config bits
1028     static final int FIFO             = 1 << 0;   // fifo queue or access mode
1029     static final int CLEAR_TLS        = 1 << 1;   // set for Innocuous workers
1030     static final int PRESET_SIZE      = 1 << 2;   // size was set by property
1031 
1032     // source history window packing used in scan() and runWorker()
1033     static final long RESCAN          = 1L << 63; // must be negative
1034     static final long HMASK           = ((((long)SMASK) << 32) |
1035                                          (((long)SMASK) << 16)); // history bits
1036     static final long NO_HISTORY      = ((((long)INVALID_ID) << 32) | // no 3rd
1037                                          (((long)INVALID_ID) << 16)); // no 2nd
1038 
1039     // others
1040     static final int DEREGISTERED     = 1 << 31;  // worker terminating
1041     static final int UNCOMPENSATE     = 1 << 16;  // tryCompensate return
1042     static final int IDLE             = 1 << 16;  // phase seqlock/version count
1043 
1044     /*
1045      * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
1046      * RC: Number of released (unqueued) workers
1047      * TC: Number of total workers
1048      * SS: version count and status of top waiting thread
1049      * ID: poolIndex of top of Treiber stack of waiters
1050      *
1051      * When convenient, we can extract the lower 32 stack top bits
1052      * (including version bits) as sp=(int)ctl. When sp is non-zero,
1053      * there are waiting workers.  Count fields may be transiently
1054      * negative during termination because of out-of-order updates.
1055      * To deal with this, we use casts in and out of "short" and/or
1056      * signed shifts to maintain signedness. Because it occupies
1057      * uppermost bits, we can add one release count using getAndAdd of
1058      * RC_UNIT, rather than CAS, when returning from a blocked join.

1213 
1214         // fields otherwise causing more unnecessary false-sharing cache misses
1215         @jdk.internal.vm.annotation.Contended("w")
1216         int top;                   // index of next slot for push
1217         @jdk.internal.vm.annotation.Contended("w")
1218         volatile int phase;        // versioned active status
1219         @jdk.internal.vm.annotation.Contended("w")
1220         int stackPred;             // pool stack (ctl) predecessor link
1221         @jdk.internal.vm.annotation.Contended("w")
1222         volatile int source;       // source queue id (or DEREGISTERED)
1223         @jdk.internal.vm.annotation.Contended("w")
1224         int nsteals;               // number of steals from other queues
1225         @jdk.internal.vm.annotation.Contended("w")
1226         volatile int parking;      // nonzero if parked in awaitWork
1227 
1228         // Support for atomic operations
1229         private static final Unsafe U;
1230         private static final long PHASE;
1231         private static final long BASE;
1232         private static final long TOP;
1233         private static final long SOURCE;
1234         private static final long ARRAY;
1235 
1236         final void updateBase(int v) {
1237             U.putIntVolatile(this, BASE, v);
1238         }
1239         final void updateTop(int v) {
1240             U.putIntOpaque(this, TOP, v);
1241         }
1242         final void forgetSource() {
1243             U.putIntOpaque(this, SOURCE, 0);
1244         }
1245         final void updateArray(ForkJoinTask<?>[] a) {
1246             U.getAndSetReference(this, ARRAY, a);
1247         }
1248         final void unlockPhase() {
1249             U.getAndAddInt(this, PHASE, IDLE);
1250         }
1251         final boolean tryLockPhase() {    // seqlock acquire
1252             int p;
1253             return (((p = phase) & IDLE) != 0 &&
1254                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1255         }
1256 
1257         /**
1258          * Constructor. For internal queues, most fields are initialized
1259          * upon thread start in pool.registerWorker.
1260          */
1261         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1262                   boolean clearThreadLocals) {
1263             if (clearThreadLocals)
1264                 cfg |= CLEAR_TLS;

1297             if ((a = array) == null || (cap = a.length) <= 0 ||
1298                 (room = (m = cap - 1) - (s - b)) < 0) { // could not resize
1299                 if (!internal)
1300                     unlockPhase();
1301                 throw new RejectedExecutionException("Queue capacity exceeded");
1302             }
1303             top = s + 1;
1304             long pos = slotOffset(p = m & s);
1305             if (!internal)
1306                 U.putReference(a, pos, task);         // inside lock
1307             else
1308                 U.getAndSetReference(a, pos, task);   // fully fenced
1309             if (room == 0 && (newCap = cap << 1) > 0) {
1310                 ForkJoinTask<?>[] newArray = null;
1311                 try {                                 // resize for next time
1312                     newArray = new ForkJoinTask<?>[newCap];
1313                 } catch (OutOfMemoryError ex) {
1314                 }
1315                 if (newArray != null) {               // else throw on next push
1316                     int newMask = newCap - 1;         // poll old, push to new

1317                     for (int k = s, j = cap; j > 0; --j, --k) {
1318                         ForkJoinTask<?> u;
1319                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1320                                  a, slotOffset(k & m), null)) == null)
1321                             break;                    // lost to pollers
1322                         newArray[k & newMask] = u;
1323                     }
1324                     updateArray(newArray);            // fully fenced
1325                 }
1326                 a = null;                             // always signal
1327             }
1328             if (!internal)
1329                 unlockPhase();
1330             if ((a == null || a[m & (s - 1)] == null) && pool != null)
1331                 pool.signalWork(a, p);
1332         }
1333 
1334         /**
1335          * Takes next task, if one exists, in order specified by mode,
1336          * so acts as either local-pop or local-poll. Called only by owner.
1337          * @param fifo nonzero if FIFO mode
1338          */
1339         private ForkJoinTask<?> nextLocalTask(int fifo) {
1340             ForkJoinTask<?> t = null;
1341             ForkJoinTask<?>[] a = array;
1342             int b = base, p = top, cap;
1343             if (a != null && (cap = a.length) > 0) {
1344                 for (int m = cap - 1, s, nb; p - b > 0; ) {
1345                     if (fifo == 0 || (nb = b + 1) == p) {
1346                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1347                                  a, slotOffset(m & (s = p - 1)), null)) != null)
1348                             updateTop(s);       // else lost race for only task
1349                         break;
1350                     }

1399         final ForkJoinTask<?> peek() {
1400             ForkJoinTask<?>[] a = array;
1401             int b = base, cfg = config, p = top, cap;
1402             if (p != b && a != null && (cap = a.length) > 0) {
1403                 if ((cfg & FIFO) == 0)
1404                     return a[(cap - 1) & (p - 1)];
1405                 else { // skip over in-progress removals
1406                     ForkJoinTask<?> t;
1407                     for ( ; p - b > 0; ++b) {
1408                         if ((t = a[(cap - 1) & b]) != null)
1409                             return t;
1410                     }
1411                 }
1412             }
1413             return null;
1414         }
1415 
1416         /**
1417          * Polls for a task. Used only by non-owners.
1418          *
1419          * @param pool if nonnull, pool to signal if more tasks exist
1420          */
1421         final ForkJoinTask<?> poll(ForkJoinPool pool) {
1422             for (;;) {
1423                 ForkJoinTask<?>[] a = array;
1424                 int b = base, cap, k;
1425                 if (a == null || (cap = a.length) <= 0)
1426                     break;
1427                 ForkJoinTask<?> t = a[k = b & (cap - 1)];
1428                 U.loadFence();
1429                 if (base == b) {
1430                     Object o;
1431                     int nb = b + 1, nk = nb & (cap - 1);
1432                     if (t == null)
1433                         o = a[k];
1434                     else if (t == (o = U.compareAndExchangeReference(
1435                                        a, slotOffset(k), t, null))) {
1436                         updateBase(nb);
1437                         if (a[nk] != null && pool != null)
1438                             pool.signalWork(a, nk); // propagate
1439                         return t;
1440                     }
1441                     if (o == null && a[nk] == null && array == a &&
1442                         (phase & (IDLE | 1)) != 0 && top - base <= 0)
1443                         break;                    // empty
1444                 }
1445             }
1446             return null;
1447         }
1448 
1449         /**
1450          * Tries to poll next task in FIFO order, failing without
1451          * retries on contention or stalls. Used only by topLevelExec
1452          * to repoll from the queue obtained from pool.scan.
1453          */
1454         private ForkJoinTask<?> tryPoll() {
1455             ForkJoinTask<?>[] a; int cap;
1456             if ((a = array) != null && (cap = a.length) > 0) {
1457                 for (int b = base, k;;) {  // loop only if inconsistent
1458                     ForkJoinTask<?> t = a[k = b & (cap - 1)];
1459                     U.loadFence();
1460                     if (b == (b = base)) {
1461                         Object o;
1462                         if (t == null)
1463                             o = a[k];
1464                         else if (t == (o = U.compareAndExchangeReference(
1465                                            a, slotOffset(k), t, null))) {
1466                             updateBase(b + 1);
1467                             return t;
1468                         }
1469                         if (o == null)
1470                             break;
1471                     }
1472                 }
1473             }
1474             return null;
1475         }
1476 
1477         // specialized execution methods
1478 
1479         /**
1480          * Runs the given (stolen) task if nonnull, as well as
1481          * remaining local tasks and/or others available from the
1482          * given queue, if any.
1483          */
1484         final void topLevelExec(ForkJoinTask<?> task, WorkQueue src, int srcId) {
1485             int cfg = config, fifo = cfg & FIFO, nstolen = nsteals + 1;
1486             if ((srcId & 1) != 0) // don't record external sources
1487                 source = srcId;
1488             if ((cfg & CLEAR_TLS) != 0)
1489                 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());

1490             while (task != null) {
1491                 task.doExec();
1492                 if ((task = nextLocalTask(fifo)) == null && src != null &&
1493                     (task = src.tryPoll()) != null)
1494                     ++nstolen;
1495             }
1496             nsteals = nstolen;
1497             forgetSource();
1498         }
1499 
1500         /**
1501          * Deep form of tryUnpush: Traverses from top and removes and
1502          * runs task if present.
1503          */
1504         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1505             ForkJoinTask<?>[] a = array;
1506             int b = base, p = top, s = p - 1, d = p - b, cap;
1507             if (a != null && (cap = a.length) > 0) {
1508                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1509                     ForkJoinTask<?> t; int k; boolean taken;
1510                     if ((t = a[k = i & m]) == null)
1511                         break;
1512                     if (t == task) {
1513                         long pos = slotOffset(k);
1514                         if (!internal && !tryLockPhase())
1515                             break;                  // fail if locked
1516                         if (taken =
1517                             (top == p &&

1619 
1620         // misc
1621 
1622         /**
1623          * Returns true if internal and not known to be blocked.
1624          */
1625         final boolean isApparentlyUnblocked() {
1626             Thread wt; Thread.State s;
1627             return ((wt = owner) != null && (phase & IDLE) != 0 &&
1628                     (s = wt.getState()) != Thread.State.BLOCKED &&
1629                     s != Thread.State.WAITING &&
1630                     s != Thread.State.TIMED_WAITING);
1631         }
1632 
1633         static {
1634             U = Unsafe.getUnsafe();
1635             Class<WorkQueue> klass = WorkQueue.class;
1636             PHASE = U.objectFieldOffset(klass, "phase");
1637             BASE = U.objectFieldOffset(klass, "base");
1638             TOP = U.objectFieldOffset(klass, "top");
1639             SOURCE = U.objectFieldOffset(klass, "source");
1640             ARRAY = U.objectFieldOffset(klass, "array");
1641         }
1642     }
1643 
1644     // static fields (initialized in static initializer below)
1645 
1646     /**
1647      * Creates a new ForkJoinWorkerThread. This factory is used unless
1648      * overridden in ForkJoinPool constructors.
1649      */
1650     public static final ForkJoinWorkerThreadFactory
1651         defaultForkJoinWorkerThreadFactory;
1652 
1653     /**
1654      * Common (static) pool. Non-null for public use unless a static
1655      * construction exception, but internal usages null-check on use
1656      * to paranoically avoid potential initialization circularities
1657      * as well as to simplify generated code.
1658      */
1659     static final ForkJoinPool common;

1856     /**
1857      * Final callback from terminating worker, as well as upon failure
1858      * to construct or start a worker.  Removes record of worker from
1859      * array, and adjusts counts. If pool is shutting down, tries to
1860      * complete termination.
1861      *
1862      * @param wt the worker thread, or null if construction failed
1863      * @param ex the exception causing failure, or null if none
1864      */
1865     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1866         WorkQueue w = null;
1867         int src = 0, phase = 0;
1868         boolean replaceable = false;
1869         if (wt != null && (w = wt.workQueue) != null) {
1870             phase = w.phase;
1871             if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
1872                 w.source = DEREGISTERED;
1873                 if (phase != 0) {         // else failed to start
1874                     replaceable = true;
1875                     if ((phase & IDLE) != 0)
1876                         reactivate(w);    // pool stopped before released
1877                 }
1878             }
1879         }
1880         long c = ctl;
1881         if (src != DEREGISTERED)          // decrement counts
1882             do {} while (c != (c = compareAndExchangeCtl(
1883                                    c, ((RC_MASK & (c - RC_UNIT)) |
1884                                        (TC_MASK & (c - TC_UNIT)) |
1885                                        (LMASK & c)))));
1886         else if ((int)c != 0)
1887             replaceable = true;           // signal below to cascade timeouts
1888         if (w != null) {                  // cancel remaining tasks
1889             for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
1890                 try {
1891                     t.cancel(false);
1892                 } catch (Throwable ignore) {
1893                 }
1894             }
1895         }
1896         if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
1897             WorkQueue[] qs; int n, i;     // remove index unless terminating
1898             long ns = w.nsteals & 0xffffffffL;
1899             int stop = lockRunState() & STOP;
1900             if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0 &&
1901                 qs[i = phase & SMASK & (n - 1)] == w) {

1902                 qs[i] = null;
1903                 stealCount += ns;         // accumulate steals
1904             }
1905             unlockRunState();


1906         }
1907         if ((runState & STOP) == 0 && replaceable)
1908             signalWork(null, 0); // may replace unless trimmed or uninitialized
1909         if (ex != null)
1910             ForkJoinTask.rethrow(ex);
1911     }
1912 
1913     /**
1914      * Releases an idle worker, or creates one if not enough exist,
1915      * returning on contention if a signal task is already taken.
1916      *
1917      * @param a if nonnull, a task array holding task signalled
1918      * @param k index of task in array
1919      */
1920     final void signalWork(ForkJoinTask<?>[] a, int k) {
1921         int pc = parallelism;
1922         for (long c = ctl;;) {
1923             WorkQueue[] qs = queues;
1924             long ac = (c + RC_UNIT) & RC_MASK, nc;
1925             int sp = (int)c, i = sp & SMASK;


1926             if (qs == null || qs.length <= i)
1927                 break;
1928             WorkQueue w = qs[i], v = null;
1929             if (sp == 0) {
1930                 if ((short)(c >>> TC_SHIFT) >= pc)
1931                     break;
1932                 nc = ((c + TC_UNIT) & TC_MASK);
1933             }
1934             else if ((short)(c >>> RC_SHIFT) >= pc || (v = w) == null)
1935                 break;
1936             else
1937                 nc = (v.stackPred & LMASK) | (c & TC_MASK);
1938             if (c == (c = compareAndExchangeCtl(c, nc | ac))) {


1939                 if (v == null)
1940                     createWorker();
1941                 else {
1942                     v.phase = sp;
1943                     if (v.parking != 0)
1944                         U.unpark(v.owner);
1945                 }
1946                 break;
1947             }
1948             if (a != null && k >= 0 && k < a.length && a[k] == null)
1949                 break;
1950         }
1951     }
1952 
1953     /**
1954      * Reactivates the given worker, and possibly others if not top of
1955      * ctl stack. Called only during shutdown to ensure release on
1956      * termination.
1957      */
1958     private void reactivate(WorkQueue w) {
1959         for (long c = ctl;;) {
1960             WorkQueue[] qs; WorkQueue v; int sp, i;
1961             if ((qs = queues) == null || (sp = (int)c) == 0 ||
1962                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null ||
1963                 (v != w && w != null && (w.phase & IDLE) == 0))
1964                 break;
1965             if (c == (c = compareAndExchangeCtl(
1966                           c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
1967                               (v.stackPred & LMASK))))) {
1968                 v.phase = sp;
1969                 if (v == w)
1970                     break;
1971                 if (v.parking != 0)
1972                     U.unpark(v.owner);
1973             }
1974         }
1975     }
1976 
1977     /**
1978      * Internal version of isQuiescent and related functionality.
1979      * @return true if terminating or all workers are inactive and
1980      * submission queues are empty and unlocked; if so, setting STOP
1981      * if shutdown is enabled
1982      */
1983     private boolean quiescent() {
1984         outer: for (;;) {
1985             long phaseSum = 0L;
1986             boolean swept = false;
1987             for (int e, prevRunState = 0; ; prevRunState = e) {
1988                 long c = ctl;
1989                 if (((e = runState) & STOP) != 0)
1990                     return true;                          // terminating
1991                 else if ((c & RC_MASK) > 0L)
1992                     return false;                         // at least one active
1993                 else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
1994                     long sum = c;
1995                     WorkQueue[] qs = queues; WorkQueue q;
1996                     int n = (qs == null) ? 0 : qs.length;
1997                     for (int i = 0; i < n; ++i) {         // scan queues
1998                         if ((q = qs[i]) != null) {
1999                             int p = q.phase, s = q.top, b = q.base;
2000                             sum += (p & 0xffffffffL) | ((long)b << 32);
2001                             if ((p & IDLE) == 0 || s - b > 0) {
2002                                 if ((i & 1) == 0 && compareAndSetCtl(c, c))
2003                                     signalWork(null, 0);  // ensure live
2004                                 return false;
2005                             }
2006                         }
2007                     }
2008                     swept = (phaseSum == (phaseSum = sum));
2009                 }
2010                 else if ((e & SHUTDOWN) == 0)
2011                     return true;
2012                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
2013                     interruptAll();                       // confirmed
2014                     return true;                          // enable termination
2015                 }
2016                 else
2017                     break;                                // restart
2018             }
2019         }
2020     }
2021 
2022     /**
2023      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
2024      * See above for explanation.
2025      *
2026      * @param w caller's WorkQueue (may be null on failed initialization)
2027      */
2028     final void runWorker(WorkQueue w) {
2029         if (w != null) {
2030             int phase = w.phase, r = w.stackPred; // seed from registerWorker
2031             long window = (long)((r >>> 16) & SMASK) | NO_HISTORY;
2032             do {
2033                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
2034             } while ((runState & STOP) == 0 &&
2035                      (((window = scan(w, window, r)) < 0L ||
2036                        ((phase = awaitWork(w, phase)) & IDLE) == 0)));
2037         }
2038     }
2039 
2040     /**
2041      * Scans for and if found executes top-level tasks: Tries to poll
2042      * each queue starting at initial index with random stride,
2043      * returning next scan window and retry indicator.
2044      *
2045      * @param w caller's WorkQueue
2046      * @param window up to three queue indices
2047      * @param r random seed
2048      * @return the next window to use, with RESCAN set for rescan

2049      */
2050     private long scan(WorkQueue w, long window, int r) {
2051         WorkQueue[] qs = queues;
2052         int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1;
2053         long next = window & ~RESCAN;
2054         outer: for (int i = (short)window, l = n; l > 0; --l, i += step) {
2055             int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
2056             if ((q = qs[j = i & SMASK & (n - 1)]) != null &&
2057                 (a = q.array) != null && (cap = a.length) > 0) {
2058                 for (int b = q.base;;) {
2059                     int nb = b + 1, nk = nb & (cap - 1), k;
2060                     ForkJoinTask<?> t = a[k = b & (cap - 1)];
2061                     U.loadFence();                // re-read b and t
2062                     if (b == (b = q.base)) {      // else inconsistent; retry
2063                         Object o;
2064                         if (t == null)
2065                             o = a[k];
2066                         else if (t == (o = U.compareAndExchangeReference(
2067                                            a, slotOffset(k), t, null))) {
2068                             q.updateBase(nb);
2069                             next = RESCAN | ((window << 16) & HMASK) | j;
2070                             if (window != next && a[nk] != null)
2071                                 signalWork(a, nk); // limit propagation
2072                             if (w != null)        // always true
2073                                 w.topLevelExec(t, q, j);
2074                             break outer;



































2075                         }
2076                         if (o == null) {
2077                             if (next >= 0L && a[nk] != null)
2078                                 next |= RESCAN;
2079                             break;























































2080                         }
2081                     }
2082                 }
2083             }
2084         }
2085         return next;


2086     }
2087 
2088     /**
2089      * Tries to inactivate, and if successful, awaits signal or termination.
2090      *
2091      * @param w the worker (may be null if already terminated)
2092      * @param phase current phase
2093      * @return current phase, with IDLE set if worker should exit
2094      */
2095     private int awaitWork(WorkQueue w, int phase) {
2096         boolean quiet;                           // true if possibly quiescent
2097         int active = phase + (IDLE << 1), p = phase | IDLE, e;
2098         if (w != null) {
2099             w.phase = p;                         // deactivate
2100             long np = active & LMASK, pc = ctl;  // try to enqueue
2101             long qc = np | ((pc - RC_UNIT) & UMASK);
2102             w.stackPred = (int)pc;               // set ctl stack link
2103             if (pc != (pc = compareAndExchangeCtl(pc, qc))) {
2104                 qc = np | ((pc - RC_UNIT) & UMASK);
2105                 w.stackPred = (int)pc;           // retry once
2106                 if (pc != (pc = compareAndExchangeCtl(pc, qc)))
2107                     p = w.phase = phase;         // back out


2108             }
2109             if (p != phase && ((e = runState) & STOP) == 0 &&
2110                 (!(quiet = (qc & RC_MASK) <= 0L) || (e & SHUTDOWN) == 0 ||
2111                  !(quiet = quiescent()) || (runState & STOP) == 0)) {
2112                 long deadline = 0L;              // not terminating
2113                 if (quiet) {                     // use timeout if trimmable
2114                     int nt = (short)(qc >>> TC_SHIFT);
2115                     long delay = keepAlive;      // scale if not at target
2116                     if (nt != (nt = Math.max(nt, parallelism)) && nt > 0)
2117                         delay = Math.max(TIMEOUT_SLOP, delay / nt);
2118                     if ((deadline = delay + System.currentTimeMillis()) == 0L)
2119                         deadline = 1L;           // avoid zero
2120                 }
2121                 boolean release = quiet;
2122                 WorkQueue[] qs = queues;         // recheck queues
2123                 int n = (qs == null) ? 0 : qs.length;
2124                 for (int l = -n, j = active; l < n; ++l, ++j) {
2125                     WorkQueue q; ForkJoinTask<?>[] a; int cap;
2126                     if ((p = w.phase) == active) // interleave signal checks
2127                         break;
2128                     if ((q = qs[j & (n - 1)]) != null &&
2129                         (a = q.array) != null && (cap = a.length) > 0 &&
2130                         a[q.base & (cap - 1)] != null) {
2131                         if (release && qc == ctl && compareAndSetCtl(qc, pc)) {
2132                             p = w.phase = active;
2133                             break;               // possible missed signal
2134                         }
2135                         release = true;          // track multiple or reencounter
2136                     }
2137                     Thread.onSpinWait();         // reduce memory traffic
2138                 }
2139                 if (p != active) {               // emulate LockSupport.park
2140                     LockSupport.setCurrentBlocker(this);
2141                     w.parking = 1;
2142                     for (;;) {
2143                         if ((runState & STOP) != 0 || (p = w.phase) == active)
2144                             break;
2145                         U.park(deadline != 0L, deadline);
2146                         if ((p = w.phase) == active || (runState & STOP) != 0)
2147                             break;
2148                         Thread.interrupted();    // clear for next park
2149                         if (deadline != 0L && TIMEOUT_SLOP >
2150                             deadline - System.currentTimeMillis()) {
2151                             long sp = w.stackPred & LMASK, c = ctl;
2152                             long nc = sp | (UMASK & (c - TC_UNIT));
2153                             if (((int)c & SMASK) == (active & SMASK) &&
2154                                 compareAndSetCtl(c, nc)) {
2155                                 w.source = DEREGISTERED;
2156                                 w.phase = active;
2157                                 break;           // trimmed on timeout
2158                             }
2159                             deadline = 0L;       // no longer trimmable
2160                         }

2161                     }
2162                     w.parking = 0;
2163                     LockSupport.setCurrentBlocker(null);
2164                 }

2165             }

2166         }
2167         return p;
2168     }
2169 
2170     /**
2171      * Scans for and returns a polled task, if available.  Used only
2172      * for untracked polls. Begins scan at a random index to avoid
2173      * systematic unfairness.
2174      *
2175      * @param submissionsOnly if true, only scan submission queues
2176      */
2177     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
2178         if ((runState & STOP) == 0) {
2179             WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
2180             int r = ThreadLocalRandom.nextSecondarySeed();
2181             if (submissionsOnly)                 // even indices only
2182                 r &= ~1;
2183             int step = (submissionsOnly) ? 2 : 1;
2184             if ((qs = queues) != null && (n = qs.length) > 0) {
2185                 for (int i = n; i > 0; i -= step, r += step) {
2186                     if ((q = qs[r & (n - 1)]) != null &&
2187                         (t = q.poll(this)) != null)
2188                         return t;
2189                 }
2190             }
2191         }
2192         return null;
2193     }
2194 
2195     /**
2196      * Tries to decrement counts (sometimes implicitly) and possibly
2197      * arrange for a compensating worker in preparation for
2198      * blocking. May fail due to interference, in which case -1 is
2199      * returned so caller may retry. A zero return value indicates
2200      * that the caller doesn't need to re-adjust counts when later
2201      * unblocked.
2202      *
2203      * @param c incoming ctl value
2204      * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
2205      */
2206     private int tryCompensate(long c) {
2207         Predicate<? super ForkJoinPool> sat;

2739             return n - (a > (p >>>= 1) ? 0 :
2740                         a > (p >>>= 1) ? 1 :
2741                         a > (p >>>= 1) ? 2 :
2742                         a > (p >>>= 1) ? 4 :
2743                         8);
2744         }
2745         return 0;
2746     }
2747 
2748     // Termination
2749 
2750     /**
2751      * Possibly initiates and/or completes pool termination.
2752      *
2753      * @param now if true, unconditionally terminate, else only
2754      * if no work and no active workers
2755      * @param enable if true, terminate when next possible
2756      * @return runState on exit
2757      */
2758     private int tryTerminate(boolean now, boolean enable) {
2759         int e = runState;
2760         if ((e & STOP) == 0) {
2761             if (now) {
2762                 int s = lockRunState();
2763                 runState = e = (s + RS_LOCK) | STOP | SHUTDOWN;
2764                 if ((s & STOP) == 0)
2765                     interruptAll();
2766             }
2767             else {
2768                 int isShutdown = (e & SHUTDOWN);
2769                 if (isShutdown == 0 && enable)
2770                     getAndBitwiseOrRunState(isShutdown = SHUTDOWN);
2771                 if (isShutdown != 0)
2772                     quiescent();                 // may trigger STOP
2773                 e = runState;
2774             }
2775         }
2776         if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
2777             int r = (int)Thread.currentThread().threadId(); // stagger traversals
2778             WorkQueue[] qs = queues;
2779             int n = (qs == null) ? 0 : qs.length;
2780             for (int l = n; l > 0; --l, ++r) {
2781                 int j = r & SMASK & (n - 1); WorkQueue q; ForkJoinTask<?> t;
2782                 while ((q = qs[j]) != null && q.source != DEREGISTERED &&
2783                        (t = q.poll(null)) != null) {
2784                     try {
2785                         t.cancel(false);
2786                     } catch (Throwable ignore) {
2787                     }
2788                 }
2789             }
2790             if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {

2791                 if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
2792                     CountDownLatch done; SharedThreadContainer ctr;
2793                     if ((done = termination) != null)
2794                         done.countDown();
2795                     if ((ctr = container) != null)
2796                         ctr.close();
2797                 }
2798                 e = runState;
2799             }
2800         }
2801         return e;
2802     }
2803 
2804     /**
2805      * Interrupts all workers
2806      */
2807     private void interruptAll() {
2808         Thread current = Thread.currentThread();

2809         WorkQueue[] qs = queues;
2810         int n = (qs == null) ? 0 : qs.length;
2811         for (int i = 1; i < n; i += 2) {
2812             WorkQueue q; Thread o;
2813             if ((q = qs[i]) != null && (o = q.owner) != null && o != current &&
2814                 q.source != DEREGISTERED) {
2815                 try {
2816                     o.interrupt();
2817                 } catch (Throwable ignore) {










2818                 }
2819             }
2820         }
2821     }
2822 
2823     /**
2824      * Returns termination signal, constructing if necessary
2825      */
2826     private CountDownLatch terminationSignal() {
2827         CountDownLatch signal, s, u;
2828         if ((signal = termination) == null)
2829             signal = ((u = cmpExTerminationSignal(
2830                            s = new CountDownLatch(1))) == null) ? s : u;
2831         return signal;
2832     }
2833 
2834     // Exported methods
2835 
2836     // Constructors
2837 

  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.reflect.Field;
  40 import java.security.AccessController;
  41 import java.security.AccessControlContext;
  42 import java.security.Permission;
  43 import java.security.Permissions;
  44 import java.security.PrivilegedAction;
  45 import java.security.ProtectionDomain;
  46 import java.util.ArrayList;
  47 import java.util.Collection;
  48 import java.util.Collections;
  49 import java.util.List;
  50 import java.util.Objects;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.CountDownLatch;
  53 import java.util.concurrent.locks.LockSupport;
  54 import jdk.internal.access.JavaUtilConcurrentFJPAccess;
  55 import jdk.internal.access.SharedSecrets;
  56 import jdk.internal.misc.Unsafe;
  57 import jdk.internal.vm.SharedThreadContainer;
  58 import jdk.internal.vm.annotation.DontInline;
  59 
  60 /**
  61  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  62  * A {@code ForkJoinPool} provides the entry point for submissions
  63  * from non-{@code ForkJoinTask} clients, as well as management and
  64  * monitoring operations.
  65  *
  66  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  67  * ExecutorService} mainly by virtue of employing
  68  * <em>work-stealing</em>: all threads in the pool attempt to find and
  69  * execute tasks submitted to the pool and/or created by other active
  70  * tasks (eventually blocking waiting for work if none exist). This
  71  * enables efficient processing when most tasks spawn other subtasks
  72  * (as do most {@code ForkJoinTask}s), as well as when many small
  73  * tasks are submitted to the pool from external clients.  Especially
  74  * when setting <em>asyncMode</em> to true in constructors, {@code
  75  * ForkJoinPool}s may also be appropriate for use with event-style
  76  * tasks that are never joined. All worker threads are initialized
  77  * with {@link Thread#isDaemon} set {@code true}.
  78  *

 167  * Upon any error in establishing these settings, default parameters
 168  * are used. It is possible to disable or limit the use of threads in
 169  * the common pool by setting the parallelism property to zero, and/or
 170  * using a factory that may return {@code null}. However doing so may
 171  * cause unjoined tasks to never be executed.
 172  *
 173  * @implNote This implementation restricts the maximum number of
 174  * running threads to 32767. Attempts to create pools with greater
 175  * than the maximum number result in {@code
 176  * IllegalArgumentException}. Also, this implementation rejects
 177  * submitted tasks (that is, by throwing {@link
 178  * RejectedExecutionException}) only when the pool is shut down or
 179  * internal resources have been exhausted.
 180  *
 181  * @since 1.7
 182  * @author Doug Lea
 183  */
 184 public class ForkJoinPool extends AbstractExecutorService {
 185 
 186     /*
 187      * Implementation Overview -- omitted until stable
 188      *



















































































































































































































































































































































































































































































































































































































































































































































































































 189      */
 190 
 191     // static configuration constants
 192 
 193     /**
 194      * Default idle timeout value (in milliseconds) for idle threads
 195      * to park waiting for new work before terminating.
 196      */
 197     static final long DEFAULT_KEEPALIVE = 60_000L;
 198 
 199     /**
 200      * Undershoot tolerance for idle timeouts
 201      */
 202     static final long TIMEOUT_SLOP = 20L;
 203 
 204     /**
 205      * The default value for common pool maxSpares.  Overridable using
 206      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
 207      * system property.  The default value is far in excess of normal
 208      * requirements, but also far short of maximum capacity and typical OS

 226     static final int MAX_CAP          = 0x7fff;   // max # workers
 227     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
 228     static final int INVALID_ID       = 0x4000;   // unused external queue id
 229 
 230     // pool.runState bits
 231     static final int STOP             = 1 <<  0;   // terminating
 232     static final int SHUTDOWN         = 1 <<  1;   // terminate when quiescent
 233     static final int TERMINATED       = 1 <<  2;   // only set if STOP also set
 234     static final int RS_LOCK          = 1 <<  3;   // lowest seqlock bit
 235 
 236     // spin/sleep limits for runState locking and elsewhere
 237     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
 238     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos
 239     static final int MAX_SLEEP        = 1 << 20;   // approx 1 sec  as nanos
 240 
 241     // {pool, workQueue} config bits
 242     static final int FIFO             = 1 << 0;   // fifo queue or access mode
 243     static final int CLEAR_TLS        = 1 << 1;   // set for Innocuous workers
 244     static final int PRESET_SIZE      = 1 << 2;   // size was set by property
 245 







 246     // others
 247     static final int DEREGISTERED     = 1 << 31;  // worker terminating
 248     static final int UNCOMPENSATE     = 1 << 16;  // tryCompensate return
 249     static final int IDLE             = 1 << 16;  // phase seqlock/version count
 250 
 251     /*
 252      * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
 253      * RC: Number of released (unqueued) workers
 254      * TC: Number of total workers
 255      * SS: version count and status of top waiting thread
 256      * ID: poolIndex of top of Treiber stack of waiters
 257      *
 258      * When convenient, we can extract the lower 32 stack top bits
 259      * (including version bits) as sp=(int)ctl. When sp is non-zero,
 260      * there are waiting workers.  Count fields may be transiently
 261      * negative during termination because of out-of-order updates.
 262      * To deal with this, we use casts in and out of "short" and/or
 263      * signed shifts to maintain signedness. Because it occupies
 264      * uppermost bits, we can add one release count using getAndAdd of
 265      * RC_UNIT, rather than CAS, when returning from a blocked join.

 420 
 421         // fields otherwise causing more unnecessary false-sharing cache misses
 422         @jdk.internal.vm.annotation.Contended("w")
 423         int top;                   // index of next slot for push
 424         @jdk.internal.vm.annotation.Contended("w")
 425         volatile int phase;        // versioned active status
 426         @jdk.internal.vm.annotation.Contended("w")
 427         int stackPred;             // pool stack (ctl) predecessor link
 428         @jdk.internal.vm.annotation.Contended("w")
 429         volatile int source;       // source queue id (or DEREGISTERED)
 430         @jdk.internal.vm.annotation.Contended("w")
 431         int nsteals;               // number of steals from other queues
 432         @jdk.internal.vm.annotation.Contended("w")
 433         volatile int parking;      // nonzero if parked in awaitWork
 434 
 435         // Support for atomic operations
 436         private static final Unsafe U;
 437         private static final long PHASE;
 438         private static final long BASE;
 439         private static final long TOP;

 440         private static final long ARRAY;
 441 
 442         final void updateBase(int v) {
 443             U.putIntVolatile(this, BASE, v);
 444         }
 445         final void updateTop(int v) {
 446             U.putIntOpaque(this, TOP, v);
 447         }



 448         final void updateArray(ForkJoinTask<?>[] a) {
 449             U.getAndSetReference(this, ARRAY, a);
 450         }
 451         final void unlockPhase() {
 452             U.getAndAddInt(this, PHASE, IDLE);
 453         }
 454         final boolean tryLockPhase() {    // seqlock acquire
 455             int p;
 456             return (((p = phase) & IDLE) != 0 &&
 457                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
 458         }
 459 
 460         /**
 461          * Constructor. For internal queues, most fields are initialized
 462          * upon thread start in pool.registerWorker.
 463          */
 464         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
 465                   boolean clearThreadLocals) {
 466             if (clearThreadLocals)
 467                 cfg |= CLEAR_TLS;

 500             if ((a = array) == null || (cap = a.length) <= 0 ||
 501                 (room = (m = cap - 1) - (s - b)) < 0) { // could not resize
 502                 if (!internal)
 503                     unlockPhase();
 504                 throw new RejectedExecutionException("Queue capacity exceeded");
 505             }
 506             top = s + 1;
 507             long pos = slotOffset(p = m & s);
 508             if (!internal)
 509                 U.putReference(a, pos, task);         // inside lock
 510             else
 511                 U.getAndSetReference(a, pos, task);   // fully fenced
 512             if (room == 0 && (newCap = cap << 1) > 0) {
 513                 ForkJoinTask<?>[] newArray = null;
 514                 try {                                 // resize for next time
 515                     newArray = new ForkJoinTask<?>[newCap];
 516                 } catch (OutOfMemoryError ex) {
 517                 }
 518                 if (newArray != null) {               // else throw on next push
 519                     int newMask = newCap - 1;         // poll old, push to new
 520                     p = newMask & s;
 521                     for (int k = s, j = cap; j > 0; --j, --k) {
 522                         ForkJoinTask<?> u;
 523                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
 524                                  a, slotOffset(k & m), null)) == null)
 525                             break;                    // lost to pollers
 526                         newArray[k & newMask] = u;
 527                     }
 528                     updateArray(a = newArray);        // fully fenced
 529                 }

 530             }
 531             if (!internal)
 532                 unlockPhase();
 533             if ((room == 0 || a[m & (s - 1)] == null) && pool != null)
 534                 pool.signalWork(a, p);
 535         }
 536 
 537         /**
 538          * Takes next task, if one exists, in order specified by mode,
 539          * so acts as either local-pop or local-poll. Called only by owner.
 540          * @param fifo nonzero if FIFO mode
 541          */
 542         private ForkJoinTask<?> nextLocalTask(int fifo) {
 543             ForkJoinTask<?> t = null;
 544             ForkJoinTask<?>[] a = array;
 545             int b = base, p = top, cap;
 546             if (a != null && (cap = a.length) > 0) {
 547                 for (int m = cap - 1, s, nb; p - b > 0; ) {
 548                     if (fifo == 0 || (nb = b + 1) == p) {
 549                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
 550                                  a, slotOffset(m & (s = p - 1)), null)) != null)
 551                             updateTop(s);       // else lost race for only task
 552                         break;
 553                     }

 602         final ForkJoinTask<?> peek() {
 603             ForkJoinTask<?>[] a = array;
 604             int b = base, cfg = config, p = top, cap;
 605             if (p != b && a != null && (cap = a.length) > 0) {
 606                 if ((cfg & FIFO) == 0)
 607                     return a[(cap - 1) & (p - 1)];
 608                 else { // skip over in-progress removals
 609                     ForkJoinTask<?> t;
 610                     for ( ; p - b > 0; ++b) {
 611                         if ((t = a[(cap - 1) & b]) != null)
 612                             return t;
 613                     }
 614                 }
 615             }
 616             return null;
 617         }
 618 
 619         /**
 620          * Polls for a task. Used only by non-owners.
 621          *

 622          */
 623         final ForkJoinTask<?> poll() {
 624             for (;;) {
 625                 ForkJoinTask<?>[] a = array;
 626                 int b = base, cap, k;
 627                 if (a == null || (cap = a.length) <= 0)
 628                     break;
 629                 ForkJoinTask<?> t = a[k = b & (cap - 1)];
 630                 U.loadFence();
 631                 if (base == b) {
 632                     Object o;
 633                     int nb = b + 1, nk = nb & (cap - 1);
 634                     if (t == null)
 635                         o = a[k];
 636                     else if (t == (o = U.compareAndExchangeReference(
 637                                        a, slotOffset(k), t, null))) {
 638                         updateBase(nb);


 639                         return t;
 640                     }
 641                     if (o == null && a[nk] == null && array == a &&
 642                         (phase & (IDLE | 1)) != 0 && top - base <= 0)
 643                         break;                    // empty
 644                 }
 645             }
 646             return null;
 647         }
 648 




























 649         // specialized execution methods
 650 
 651         /**
 652          * Runs the given task, as well as remaining local tasks


 653          */
 654         @DontInline // avoid dispatch in caller method (scan)
 655         final void topLevelExec(ForkJoinTask<?> task, int cfg) {


 656             if ((cfg & CLEAR_TLS) != 0)
 657                 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
 658             int fifo = cfg & FIFO;
 659             while (task != null) {
 660                 task.doExec();
 661                 task = nextLocalTask(fifo);


 662             }


 663         }
 664 
 665         /**
 666          * Deep form of tryUnpush: Traverses from top and removes and
 667          * runs task if present.
 668          */
 669         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
 670             ForkJoinTask<?>[] a = array;
 671             int b = base, p = top, s = p - 1, d = p - b, cap;
 672             if (a != null && (cap = a.length) > 0) {
 673                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
 674                     ForkJoinTask<?> t; int k; boolean taken;
 675                     if ((t = a[k = i & m]) == null)
 676                         break;
 677                     if (t == task) {
 678                         long pos = slotOffset(k);
 679                         if (!internal && !tryLockPhase())
 680                             break;                  // fail if locked
 681                         if (taken =
 682                             (top == p &&

 784 
 785         // misc
 786 
 787         /**
 788          * Returns true if internal and not known to be blocked.
 789          */
 790         final boolean isApparentlyUnblocked() {
 791             Thread wt; Thread.State s;
 792             return ((wt = owner) != null && (phase & IDLE) != 0 &&
 793                     (s = wt.getState()) != Thread.State.BLOCKED &&
 794                     s != Thread.State.WAITING &&
 795                     s != Thread.State.TIMED_WAITING);
 796         }
 797 
 798         static {
 799             U = Unsafe.getUnsafe();
 800             Class<WorkQueue> klass = WorkQueue.class;
 801             PHASE = U.objectFieldOffset(klass, "phase");
 802             BASE = U.objectFieldOffset(klass, "base");
 803             TOP = U.objectFieldOffset(klass, "top");

 804             ARRAY = U.objectFieldOffset(klass, "array");
 805         }
 806     }
 807 
 808     // static fields (initialized in static initializer below)
 809 
 810     /**
 811      * Creates a new ForkJoinWorkerThread. This factory is used unless
 812      * overridden in ForkJoinPool constructors.
 813      */
 814     public static final ForkJoinWorkerThreadFactory
 815         defaultForkJoinWorkerThreadFactory;
 816 
 817     /**
 818      * Common (static) pool. Non-null for public use unless a static
 819      * construction exception, but internal usages null-check on use
 820      * to paranoically avoid potential initialization circularities
 821      * as well as to simplify generated code.
 822      */
 823     static final ForkJoinPool common;

1020     /**
1021      * Final callback from terminating worker, as well as upon failure
1022      * to construct or start a worker.  Removes record of worker from
1023      * array, and adjusts counts. If pool is shutting down, tries to
1024      * complete termination.
1025      *
1026      * @param wt the worker thread, or null if construction failed
1027      * @param ex the exception causing failure, or null if none
1028      */
1029     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1030         WorkQueue w = null;
1031         int src = 0, phase = 0;
1032         boolean replaceable = false;
1033         if (wt != null && (w = wt.workQueue) != null) {
1034             phase = w.phase;
1035             if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
1036                 w.source = DEREGISTERED;
1037                 if (phase != 0) {         // else failed to start
1038                     replaceable = true;
1039                     if ((phase & IDLE) != 0)
1040                         releaseAll();     // pool stopped before released
1041                 }
1042             }
1043         }
1044         if (src != DEREGISTERED) {        // decrement counts
1045             long c = ctl;
1046             do {} while (c != (c = compareAndExchangeCtl(
1047                                    c, ((RC_MASK & (c - RC_UNIT)) |
1048                                        (TC_MASK & (c - TC_UNIT)) |
1049                                        (LMASK & c)))));
1050             if (w != null) {              // cancel remaining tasks
1051                 for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
1052                     try {
1053                         t.cancel(false);
1054                     } catch (Throwable ignore) {
1055                     }

1056                 }
1057             }
1058         }
1059         if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
1060             WorkQueue[] qs; int n, i;     // remove index unless terminating
1061             long ns = w.nsteals & 0xffffffffL;
1062             if ((lockRunState() & STOP) != 0)
1063                 replaceable = false;
1064             else if ((qs = queues) != null && (n = qs.length) > 0 &&
1065                      qs[i = phase & SMASK & (n - 1)] == w) {
1066                 qs[i] = null;
1067                 stealCount += ns;         // accumulate steals
1068             }
1069             unlockRunState();
1070             if (replaceable)
1071                 signalWork(null, 0);
1072         }


1073         if (ex != null)
1074             ForkJoinTask.rethrow(ex);
1075     }
1076 
1077     /**
1078      * Releases an idle worker, or creates one if not enough exist,
1079      * returning on contention if a signal task is already taken.
1080      *
1081      * @param a if nonnull, a task array holding task signalled
1082      * @param k index of task in array
1083      */
1084     final void signalWork(ForkJoinTask<?>[] a, int k) {
1085         int pc = parallelism;
1086         for (long c = ctl;;) {
1087             WorkQueue[] qs = queues;
1088             long ac = (c + RC_UNIT) & RC_MASK, nc;
1089             int sp = (int)c, i = sp & SMASK;
1090             if ((short)(c >>> RC_SHIFT) >= pc)
1091                 break;
1092             if (qs == null || qs.length <= i)
1093                 break;
1094             WorkQueue w = qs[i], v = null;
1095             if (sp == 0) {
1096                 if ((short)(c >>> TC_SHIFT) >= pc)
1097                     break;
1098                 nc = ac | ((c + TC_UNIT) & TC_MASK);
1099             }
1100             else if ((v = w) == null)
1101                 break;
1102             else
1103                 nc = ac | (c & TC_MASK) | (v.stackPred & LMASK);
1104             if (a != null && a.length > k && k >= 0 && a[k] == null)
1105                 break;
1106             if (c == (c = compareAndExchangeCtl(c, nc))) {
1107                 if (v == null)
1108                     createWorker();
1109                 else {
1110                     v.phase = sp;
1111                     if (v.parking != 0)
1112                         U.unpark(v.owner);
1113                 }
1114                 break;
1115             }


1116         }
1117     }
1118 
1119     /**
1120      * Reactivates the given worker, and possibly others if not top of
1121      * ctl stack. Called only during shutdown to ensure release on
1122      * termination.
1123      */
1124     private void releaseAll() {
1125         for (long c = ctl;;) {
1126             WorkQueue[] qs; WorkQueue v; int sp, i;
1127             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1128                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1129                 break;
1130             if (c == (c = compareAndExchangeCtl(
1131                           c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
1132                               (v.stackPred & LMASK))))) {
1133                 v.phase = sp;


1134                 if (v.parking != 0)
1135                     U.unpark(v.owner);
1136             }
1137         }
1138     }
1139 
1140     /**
1141      * Internal version of isQuiescent and related functionality.
1142      * @return true if terminating or all workers are inactive and
1143      * submission queues are empty and unlocked; if so, setting STOP
1144      * if shutdown is enabled
1145      */
1146     private boolean quiescent() {
1147         outer: for (;;) {
1148             long phaseSum = 0L;
1149             boolean swept = false;
1150             for (int e, prevRunState = 0; ; prevRunState = e) {
1151                 long c = ctl;
1152                 if (((e = runState) & STOP) != 0)
1153                     return true;                          // terminating
1154                 else if ((c & RC_MASK) > 0L)
1155                     return false;                         // at least one active
1156                 else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
1157                     long sum = c;
1158                     WorkQueue[] qs = queues; WorkQueue q;
1159                     int n = (qs == null) ? 0 : qs.length;
1160                     for (int i = 0; i < n; ++i) {         // scan queues
1161                         if ((q = qs[i]) != null) {
1162                             int p = q.phase, s = q.top, b = q.base;
1163                             sum += (p & 0xffffffffL) | ((long)b << 32);
1164                             if ((p & IDLE) == 0 || s - b > 0) {
1165                                 if ((i & 1) == 0 && compareAndSetCtl(c, c))
1166                                     signalWork(q.array, q.base);  // ensure live
1167                                 return false;
1168                             }
1169                         }
1170                     }
1171                     swept = (phaseSum == (phaseSum = sum));
1172                 }
1173                 else if ((e & SHUTDOWN) == 0)
1174                     return true;
1175                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
1176                     releaseAll();                         // confirmed
1177                     return true;                          // enable termination
1178                 }
1179                 else
1180                     break;                                // restart
1181             }
1182         }
1183     }
1184 
1185     /**
1186      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1187      * See above for explanation.
1188      *
1189      * @param w caller's WorkQueue (may be null on failed initialization)
1190      */
1191     final void runWorker(WorkQueue w) {
1192         if (w != null) {
1193             int cfg = w.config & (FIFO|CLEAR_TLS), r = w.stackPred;

1194             do {
1195                 r = scan(w, r, cfg);
1196             } while (awaitWork(w) == 0);


1197         }
1198     }
1199 
1200     /**
1201      * Scans for and executes top-level tasks until none found.


1202      *
1203      * @param w caller's WorkQueue
1204      * @param p w's phase
1205      * @param r random seed
1206      * @param cfg config bits
1207      * @return random seed, for next use
1208      */
1209     private int scan(WorkQueue w, int r, int cfg) {
1210         int inactive = 0;                           // nonzero after empty scan
1211         int nsteals = 0;                            // to update stealCount
1212         boolean screening = false;                  // self-signal precheck
1213         boolean reactivatable = false;              // enable self-signalling
1214         boolean running = false;                    // ran task since activating
1215         boolean rescan = true;
1216         for (;;) {
1217             int e, n; WorkQueue[] qs;
1218             if (((e = runState) & STOP) != 0 || w == null)
1219                 break;
1220             boolean wasScreening = screening;
1221             screening = false;
1222             if (!rescan) {
1223                 if (inactive != 0) {
1224                     int noise = (((SPIN_WAITS >>> 1) - 1) & (r >>> 16)) | 0xf;
1225                     int spins = (wasScreening) ? noise | SPIN_WAITS : noise;
1226                     while ((inactive = w.phase & IDLE) != 0 && --spins > 0)
1227                         Thread.onSpinWait();        // reduce flailing
1228                     if (inactive != 0 && !(wasScreening & reactivatable))
1229                         break;                      // block or terminate
1230                 }
1231                 else {
1232                     long pc = ctl;                  // try to deactivate
1233                     int phase = w.phase;
1234                     long qc = (((phase + (IDLE << 1)) & LMASK) |
1235                                ((pc - RC_UNIT) & UMASK));
1236                     w.stackPred = (int)pc;          // set ctl stack link
1237                     w.phase = phase | IDLE;
1238                     if (runState != e || !compareAndSetCtl(pc, qc))
1239                         w.phase = phase;            // back out
1240                     else if ((qc & RC_MASK) <= 0L)
1241                         break;                      // check quiescent shutdown
1242                     else {
1243                         screening = true;
1244                         running = false;
1245                         inactive = IDLE;
1246                     }
1247                 }
1248             }
1249             else if (inactive != 0)
1250                 inactive = w.phase & IDLE;
1251             rescan = reactivatable = false;
1252             r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1253             if ((qs = queues) == null || (n = qs.length) <= 0)
1254                 break;                              // currently impossible
1255             int steps = n, i = r, stride = (r >>> 24) | 1;
1256             do {
1257                 int src, cap; WorkQueue q; ForkJoinTask<?>[] a;
1258                 if ((q = qs[src = (i += stride) & (n - 1)]) != null &&
1259                     (a = q.array) != null && (cap = a.length) > 0) {
1260                     for (int b = 0;;) {
1261                         int pb = b, k, nk;
1262                         ForkJoinTask<?> t = a[k = (cap - 1) & (b = q.base)];
1263                         U.loadFence();              // re-read
1264                         if (q.base != b) {          // inconsistent
1265                             if (rescan | screening)
1266                                 break;              // reduce contention
1267                             else
1268                                 continue;
1269                         }
1270                         if (t == null) {
1271                             ForkJoinTask<?>[] na = q.array;
1272                             if (a[k] != null)       // stale
1273                                 continue;
1274                             else if (a[(b + 1) & (cap - 1)] == null) {
1275                                 if (na != a)        // resized
1276                                     rescan = true;  // else probably empty
1277                                 break;
1278                             }
1279                             else if (rescan | screening)
1280                                 break;              // already retrying
1281                             else if (pb == b) {
1282                                 rescan = true;
1283                                 break;              // retry later
1284                             }
1285                             else
1286                                 continue;           // not (yet) stalled
1287                         }
1288                         long kp = slotOffset(k);
1289                         if (inactive != 0) {        // recheck or reactivate
1290                             int p = w.phase, sp = w.stackPred, nextp; long c;
1291                             if ((inactive = p & IDLE) == 0)
1292                                 rescan = true;
1293                             else if (screening) {
1294                                 reactivatable = true;
1295                                 break;              // not yet enabled
1296                             }
1297                             else if ((nextp = p + 1) != (int)(c = ctl))
1298                                 break;              // ineligible
1299                             else if (q.base != b)
1300                                 break;              // already taken
1301                             else if (!compareAndSetCtl(
1302                                          c, ((sp & LMASK) |
1303                                              ((c + RC_UNIT) & UMASK))))
1304                                 break;              // lost race
1305                             else {
1306                                 w.phase = nextp;
1307                                 inactive = 0;       // self-signalled
1308                             }
1309                         }
1310                         if (!U.compareAndSetReference(a, kp, t, null)) {
1311                             if (q.base != b) {      // contention
1312                                 if (rescan | screening)
1313                                     break;
1314                                 ++b;                // for next stall check
1315                             }
1316                         }
1317                         else {
1318                             q.base = ++b;           // taken
1319                             ForkJoinTask<?> nt = a[nk = b & (cap - 1)];
1320                             w.source = src;         // volatile write
1321                             ++nsteals;
1322                             boolean propagate = !running;
1323                             rescan = running = true;
1324                             if (propagate && nt != null)
1325                                 signalWork(a, nk);
1326                             w.topLevelExec(t, cfg);
1327                             if (q.base != b)
1328                                 break;             // reduce interference
1329                         }
1330                     }
1331                 }
1332             } while (--steps > 0);
1333         }
1334         if (nsteals > 0 && w != null)
1335             w.nsteals += nsteals;
1336         return r;
1337     }
1338 
1339     /**
1340      * Awaits signal or termination.
1341      *
1342      * @param w the WorkQueue (may be null if already terminated)
1343      * @return nonzero for exit

1344      */
1345     private int awaitWork(WorkQueue w) {
1346         int p = IDLE, phase;
1347         if ((runState & STOP) == 0 &&      // check for quiescent shutdown
1348             ((ctl & RC_MASK) > 0L || !quiescent() || (runState & STOP) == 0) &&
1349             w != null && (p = (phase = w.phase) & IDLE) != 0) {
1350             LockSupport.setCurrentBlocker(this);
1351             int active = phase + IDLE;     // next w.phase
1352             long c, deadline = 0L;         // set if all idle and w is ctl top
1353             if (((c = ctl) & RC_MASK) <= 0L && (int)c == active) {
1354                 int np = parallelism, nt = (short)(c >>> TC_SHIFT);
1355                 long delay = keepAlive;    // scale if not fully populated
1356                 if (nt != (nt = Math.max(nt, np)) && nt > 0)
1357                     delay = Math.max(TIMEOUT_SLOP, delay / nt);
1358                 long d = delay + System.currentTimeMillis();
1359                 deadline = (d == 0L) ? 1L : d;
1360             }
1361             if ((p = w.phase & IDLE) != 0) {
1362                 w.parking = 1;             // enable unpark
1363                 for (;;) {                 // emulate LockSupport.park
1364                     if ((runState & STOP) != 0)














1365                         break;
1366                     if ((p = w.phase & IDLE) == 0)
1367                         break;
1368                     U.park(deadline != 0L, deadline);
1369                     if ((p = w.phase & IDLE) == 0)
1370                         break;
1371                     if ((runState & STOP) != 0)
1372                         break;
1373                     Thread.interrupted();  // clear for next park
1374                     if (deadline != 0L &&  // try to trim
1375                         deadline - System.currentTimeMillis() < TIMEOUT_SLOP) {
1376                         long sp = w.stackPred & LMASK, dc = ctl;
1377                         long nc = sp | (UMASK & (dc - TC_UNIT));
1378                         if ((int)dc == active && compareAndSetCtl(dc, nc)) {
1379                             WorkQueue[] qs; WorkQueue v; int vp, i;
1380                             w.source = DEREGISTERED;
1381                             w.phase = active; // try to wake up next waiter
1382                             if ((vp = (int)nc) != 0 && (qs = queues) != null &&
1383                                 qs.length > (i = vp & SMASK) &&
1384                                 (v = qs[i]) != null &&
1385                                 compareAndSetCtl(nc, ((UMASK & (nc + RC_UNIT)) |
1386                                                       (nc & TC_MASK) |
1387                                                       (v.stackPred & LMASK)))) {
1388                                 v.phase = vp;
1389                                 U.unpark(v.owner);






1390                             }
1391                             break;
1392                         }
1393                         deadline = 0L;     // no longer trimmable
1394                     }


1395                 }
1396                 w.parking = 0;             // disable unpark
1397             }
1398             LockSupport.setCurrentBlocker(null);
1399         }
1400         return p;
1401     }
1402 
1403     /**
1404      * Scans for and returns a polled task, if available.  Used only
1405      * for untracked polls. Begins scan at a random index to avoid
1406      * systematic unfairness.
1407      *
1408      * @param submissionsOnly if true, only scan submission queues
1409      */
1410     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1411         if ((runState & STOP) == 0) {
1412             WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
1413             int r = ThreadLocalRandom.nextSecondarySeed();
1414             if (submissionsOnly)                 // even indices only
1415                 r &= ~1;
1416             int step = (submissionsOnly) ? 2 : 1;
1417             if ((qs = queues) != null && (n = qs.length) > 0) {
1418                 for (int i = n; i > 0; i -= step, r += step) {
1419                     if ((q = qs[r & (n - 1)]) != null &&
1420                         (t = q.poll()) != null)
1421                         return t;
1422                 }
1423             }
1424         }
1425         return null;
1426     }
1427 
1428     /**
1429      * Tries to decrement counts (sometimes implicitly) and possibly
1430      * arrange for a compensating worker in preparation for
1431      * blocking. May fail due to interference, in which case -1 is
1432      * returned so caller may retry. A zero return value indicates
1433      * that the caller doesn't need to re-adjust counts when later
1434      * unblocked.
1435      *
1436      * @param c incoming ctl value
1437      * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1438      */
1439     private int tryCompensate(long c) {
1440         Predicate<? super ForkJoinPool> sat;

1972             return n - (a > (p >>>= 1) ? 0 :
1973                         a > (p >>>= 1) ? 1 :
1974                         a > (p >>>= 1) ? 2 :
1975                         a > (p >>>= 1) ? 4 :
1976                         8);
1977         }
1978         return 0;
1979     }
1980 
1981     // Termination
1982 
1983     /**
1984      * Possibly initiates and/or completes pool termination.
1985      *
1986      * @param now if true, unconditionally terminate, else only
1987      * if no work and no active workers
1988      * @param enable if true, terminate when next possible
1989      * @return runState on exit
1990      */
1991     private int tryTerminate(boolean now, boolean enable) {
1992         int e = runState, isShutdown;
1993         if ((e & STOP) == 0) {
1994             if (now) {
1995                 runState = e = (lockRunState() + RS_LOCK) | STOP | SHUTDOWN;
1996                 releaseAll();


1997             }
1998             else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) {
1999                 if (isShutdown == 0)
2000                     getAndBitwiseOrRunState(SHUTDOWN);
2001                 quiescent();                    // may trigger STOP


2002                 e = runState;
2003             }
2004         }
2005         if ((e & (STOP | TERMINATED)) == STOP) {
2006             if ((ctl & RC_MASK) > 0L)           // avoid if quiescent shutdown
2007                 helpTerminate(now);











2008             if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
2009                 e |= TERMINATED;
2010                 if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
2011                     CountDownLatch done; SharedThreadContainer ctr;
2012                     if ((done = termination) != null)
2013                         done.countDown();
2014                     if ((ctr = container) != null)
2015                         ctr.close();
2016                 }

2017             }
2018         }
2019         return e;
2020     }
2021 
2022     /**
2023      * Cancels tasks and interrupts workers
2024      */
2025     private void helpTerminate(boolean now) {
2026         Thread current = Thread.currentThread();
2027         int r = (int)current.threadId();   // stagger traversals
2028         WorkQueue[] qs = queues;
2029         int n = (qs == null) ? 0 : qs.length;
2030         for (int l = n; l > 0; --l, ++r) {
2031             WorkQueue q; ForkJoinTask<?> t; Thread o;
2032             int j = r & SMASK & (n - 1);
2033             if ((q = qs[j]) != null && q.source != DEREGISTERED) {
2034                 while ((t = q.poll()) != null) {
2035                     try {
2036                         t.cancel(false);
2037                     } catch (Throwable ignore) {
2038                     }
2039                 }
2040                 if ((r & 1) != 0 && (o = q.owner) != null &&
2041                     o != current && q.source != DEREGISTERED &&
2042                     (now || !o.isInterrupted())) {
2043                     try {
2044                         o.interrupt();
2045                     } catch (Throwable ignore) {
2046                     }
2047                 }
2048             }
2049         }
2050     }
2051 
2052     /**
2053      * Returns termination signal, constructing if necessary
2054      */
2055     private CountDownLatch terminationSignal() {
2056         CountDownLatch signal, s, u;
2057         if ((signal = termination) == null)
2058             signal = ((u = cmpExTerminationSignal(
2059                            s = new CountDownLatch(1))) == null) ? s : u;
2060         return signal;
2061     }
2062 
2063     // Exported methods
2064 
2065     // Constructors
2066 
< prev index next >