< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

 543      * WorkQueue field "phase" encodes the queue array id in lower
 544      * bits, and otherwise acts similarly to the pool runState field:
 545      * The "IDLE" bit is clear while active (either a released worker
 546      * or a locked external queue), with other bits serving as a
 547      * version counter to distinguish changes across multiple reads.
 548      * Note that phase field updates lag queue CAS releases; seeing a
 549      * non-idle phase does not guarantee that the worker is available
 550      * (and so is never checked in this way).
 551      *
 552      * The ctl field also serves as the basis for memory
 553      * synchronization surrounding activation. This uses a more
 554      * efficient version of a Dekker-like rule that task producers and
 555      * consumers sync with each other by both writing/CASing ctl (even
 556      * if to its current value).  However, rather than CASing ctl to
 557      * its current value in the common case where no action is
 558      * required, we reduce write contention by ensuring that
 559      * signalWork invocations are prefaced with a fully fenced memory
 560      * access (which is usually needed anyway).
 561      *
 562      * Signalling. Signals (in signalWork) cause new or reactivated
 563      * workers to scan for tasks.  Method signalWork and its callers
 564      * try to approximate the unattainable goal of having the right
 565      * number of workers activated for the tasks at hand, but must err
 566      * on the side of too many workers vs too few to avoid stalls:
 567      *
 568      *  * If computations are purely tree structured, it suffices for
 569      *    every worker to activate another when it pushes a task into
 570      *    an empty queue, resulting in O(log(#threads)) steps to full
 571      *    activation. Emptiness must be conservatively approximated,
 572      *    which may result in unnecessary signals.  Also, to reduce
 573      *    resource usages in some cases, at the expense of slower
 574      *    startup in others, activation of an idle thread is preferred
 575      *    over creating a new one, here and elsewhere.
 576      *
 577      *  * At the other extreme, if "flat" tasks (those that do not in
 578      *    turn generate others) come in serially from only a single
 579      *    producer, each worker taking a task from a queue should
 580      *    propagate a signal if there are more tasks in that
 581      *    queue. This is equivalent to, but generally faster than,
 582      *    arranging the stealer take multiple tasks, re-pushing one or
 583      *    more on its own queue, and signalling (because its queue is
 584      *    empty), also resulting in logarithmic full activation
 585      *    time. If tasks do not not engage in unbounded loops based on
 586      *    the actions of other workers with unknown dependencies loop,
 587      *    this form of proagation can be limited to one signal per
 588      *    activation (phase change). We distinguish the cases by
 589      *    further signalling only if the task is an InterruptibleTask
 590      *    (see below), which are the only supported forms of task that
 591      *    may do so.
 592      *
 593      * * Because we don't know about usage patterns (or most commonly,
 594      *    mixtures), we use both approaches, which present even more
 595      *    opportunities to over-signal. (Failure to distinguish these
 596      *    cases in terms of submission methods was arguably an early
 597      *    design mistake.)  Note that in either of these contexts,
 598      *    signals may be (and often are) unnecessary because active
 599      *    workers continue scanning after running tasks without the
 600      *    need to be signalled (which is one reason work stealing is
 601      *    often faster than alternatives), so additional workers
 602      *    aren't needed.
 603      *
 604      * * For rapidly branching tasks that require full pool resources,
 605      *   oversignalling is OK, because signalWork will soon have no
 606      *   more workers to create or reactivate. But for others (mainly
 607      *   externally submitted tasks), overprovisioning may cause very
 608      *   noticeable slowdowns due to contention and resource
 609      *   wastage. We reduce impact by deactivating workers when
 610      *   queues don't have accessible tasks, but reactivating and
 611      *   rescanning if other tasks remain.
 612      *
 613      * * Despite these, signal contention and overhead effects still
 614      *   occur during ramp-up and ramp-down of small computations.
 615      *
 616      * Scanning. Method runWorker performs top-level scanning for (and
 617      * execution of) tasks by polling a pseudo-random permutation of
 618      * the array (by starting at a given index, and using a constant
 619      * cyclically exhaustive stride.)  It uses the same basic polling
 620      * method as WorkQueue.poll(), but restarts with a different
 621      * permutation on each invocation.  The pseudorandom generator
 622      * need not have high-quality statistical properties in the long
 623      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 624      * from ThreadLocalRandom probes, which are cheap and
 625      * suffice. Each queue's polling attempts to avoid becoming stuck
 626      * when other scanners/pollers stall.  Scans do not otherwise
 627      * explicitly take into account core affinities, loads, cache
 628      * localities, etc, However, they do exploit temporal locality
 629      * (which usually approximates these) by preferring to re-poll
 630      * from the same queue after a successful poll before trying
 631      * others, which also reduces bookkeeping, cache traffic, and
 632      * scanning overhead. But it also reduces fairness, which is
 633      * partially counteracted by giving up on detected interference
 634      * (which also reduces contention when too many workers try to
 635      * take small tasks from the same queue).
 636      *
 637      * Deactivation. When no tasks are found by a worker in runWorker,
 638      * it tries to deactivate()), giving up (and rescanning) on "ctl"
 639      * contention. To avoid missed signals during deactivation, the
 640      * method rescans and reactivates if there may have been a missed
 641      * signal during deactivation. To reduce false-alarm reactivations
 642      * while doing so, we scan multiple times (analogously to method
 643      * quiescent()) before trying to reactivate.  Because idle workers
 644      * are often not yet blocked (parked), we use a WorkQueue field to
 645      * advertise that a waiter actually needs unparking upon signal.



























 646      *
 647      * Quiescence. Workers scan looking for work, giving up when they
 648      * don't find any, without being sure that none are available.
 649      * However, some required functionality relies on consensus about
 650      * quiescence (also termination, discussed below). The count
 651      * fields in ctl allow accurate discovery of states in which all
 652      * workers are idle.  However, because external (asynchronous)
 653      * submitters are not part of this vote, these mechanisms
 654      * themselves do not guarantee that the pool is in a quiescent
 655      * state with respect to methods isQuiescent, shutdown (which
 656      * begins termination when quiescent), helpQuiesce, and indirectly
 657      * others including tryCompensate. Method quiescent() is used in
 658      * all of these contexts. It provides checks that all workers are
 659      * idle and there are no submissions that they could poll if they
 660      * were not idle, retrying on inconsistent reads of queues and
 661      * using the runState seqLock to retry on queue array updates.
 662      * (It also reports quiescence if the pool is terminating.) A true
 663      * report means only that there was a moment at which quiescence
 664      * held.  False negatives are inevitable (for example when queues
 665      * indices lag updates, as described above), which is accommodated

 875      * ====================
 876      *
 877      * Regular ForkJoinTasks manage task cancellation (method cancel)
 878      * independently from the interrupt status of threads running
 879      * tasks.  Interrupts are issued internally only while
 880      * terminating, to wake up workers and cancel queued tasks.  By
 881      * default, interrupts are cleared only when necessary to ensure
 882      * that calls to LockSupport.park do not loop indefinitely (park
 883      * returns immediately if the current thread is interrupted).
 884      *
 885      * To comply with ExecutorService specs, we use subclasses of
 886      * abstract class InterruptibleTask for tasks that require
 887      * stronger interruption and cancellation guarantees.  External
 888      * submitters never run these tasks, even if in the common pool
 889      * (as indicated by ForkJoinTask.noUserHelp status bit).
 890      * InterruptibleTasks include a "runner" field (implemented
 891      * similarly to FutureTask) to support cancel(true).  Upon pool
 892      * shutdown, runners are interrupted so they can cancel. Since
 893      * external joining callers never run these tasks, they must await
 894      * cancellation by others, which can occur along several different
 895      * paths. The inability to rely on caller-runs may also require
 896      * extra signalling (resulting in scanning and contention) so is
 897      * done only conditionally in methods push and runworker.
 898      *
 899      * Across these APIs, rules for reporting exceptions for tasks
 900      * with results accessed via join() differ from those via get(),
 901      * which differ from those invoked using pool submit methods by
 902      * non-workers (which comply with Future.get() specs). Internal
 903      * usages of ForkJoinTasks ignore interrupt status when executing
 904      * or awaiting completion.  Otherwise, reporting task results or
 905      * exceptions is preferred to throwing InterruptedExceptions,
 906      * which are in turn preferred to timeouts. Similarly, completion
 907      * status is preferred to reporting cancellation.  Cancellation is
 908      * reported as an unchecked exception by join(), and by worker
 909      * calls to get(), but is otherwise wrapped in a (checked)
 910      * ExecutionException.
 911      *
 912      * Worker Threads cannot be VirtualThreads, as enforced by
 913      * requiring ForkJoinWorkerThreads in factories.  There are
 914      * several constructions relying on this.  However as of this
 915      * writing, virtual thread bodies are by default run as some form
 916      * of InterruptibleTask.
 917      *

 955      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 956      * most false-sharing misses with respect to other fields. Also,
 957      * ForkJoinPool fields are ordered such that fields less prone to
 958      * contention effects are first, offsetting those that otherwise
 959      * would be, while also reducing total footprint vs using
 960      * multiple @Contended regions, which tends to slow down
 961      * less-contended applications. To help arrange this, some
 962      * non-reference fields are declared as "long" even when ints or
 963      * shorts would suffice.  For class WorkQueue, an
 964      * embedded @Contended region segregates fields most heavily
 965      * updated by owners from those most commonly read by stealers or
 966      * other management.
 967      *
 968      * Initial sizing and resizing of WorkQueue arrays is an even more
 969      * delicate tradeoff because the best strategy systematically
 970      * varies across garbage collectors. Small arrays are better for
 971      * locality and reduce GC scan time, but large arrays reduce both
 972      * direct false-sharing and indirect cases due to GC bookkeeping
 973      * (cardmarks etc), and reduce the number of resizes, which are
 974      * not especially fast because they require atomic transfers.
 975      * Currently, arrays for workers are initialized to be just large
 976      * enough to avoid resizing in most tree-structured tasks, but
 977      * larger for external queues where both false-sharing problems
 978      * and the need for resizing are more common. (Maintenance note:
 979      * any changes in fields, queues, or their uses, or JVM layout
 980      * policies, must be accompanied by re-evaluation of these
 981      * placement and sizing decisions.)
 982      *
 983      * Style notes
 984      * ===========
 985      *
 986      * Memory ordering relies mainly on atomic operations (CAS,
 987      * getAndSet, getAndAdd) along with moded accesses. These use
 988      * jdk-internal Unsafe for atomics and special memory modes,
 989      * rather than VarHandles, to avoid initialization dependencies in
 990      * other jdk components that require early parallelism.  This can
 991      * be awkward and ugly, but also reflects the need to control
 992      * outcomes across the unusual cases that arise in very racy code
 993      * with very few invariants. All atomic task slot updates use
 994      * Unsafe operations requiring offset positions, not indices, as
 995      * computed by method slotOffset. All fields are read into locals
 996      * before use, and null-checked if they are references, even if
 997      * they can never be null under current usages. Usually,
 998      * computations (held in local variables) are defined as soon as
 999      * logically enabled, sometimes to convince compilers that they
1000      * may be performed despite memory ordering constraints.  Array
1001      * accesses using masked indices include checks (that are always

1044      */
1045     static final long DEFAULT_KEEPALIVE = 60_000L;
1046 
1047     /**
1048      * Undershoot tolerance for idle timeouts, also serving as the
1049      * minimum allowed timeout value.
1050      */
1051     static final long TIMEOUT_SLOP = 20L;
1052 
1053     /**
1054      * The default value for common pool maxSpares.  Overridable using
1055      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1056      * system property.  The default value is far in excess of normal
1057      * requirements, but also far short of maximum capacity and typical OS
1058      * thread limits, so allows JVMs to catch misuse/abuse before
1059      * running out of resources needed to do so.
1060      */
1061     static final int DEFAULT_COMMON_MAX_SPARES = 256;
1062 
1063     /**
1064      * Initial capacity of work-stealing queue array for workers.
1065      * Must be a power of two, at least 2. See above.
1066      */
1067     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1068 
1069     /**
1070      * Initial capacity of work-stealing queue array for external queues.
1071      * Must be a power of two, at least 2. See above.
1072      */
1073     static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
1074 
1075     // conversions among short, int, long
1076     static final int  SMASK           = 0xffff;      // (unsigned) short bits
1077     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
1078     static final long UMASK           = ~LMASK;      // upper 32 bits
1079 
1080     // masks and sentinels for queue indices
1081     static final int MAX_CAP          = 0x7fff;   // max # workers
1082     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1083     static final int INVALID_ID       = 0x4000;   // unused external queue id
1084 
1085     // pool.runState bits
1086     static final long STOP            = 1L <<  0;   // terminating
1087     static final long SHUTDOWN        = 1L <<  1;   // terminate when quiescent
1088     static final long CLEANED         = 1L <<  2;   // stopped and queues cleared
1089     static final long TERMINATED      = 1L <<  3;   // only set if STOP also set
1090     static final long RS_LOCK         = 1L <<  4;   // lowest seqlock bit
1091 
1092     // spin/sleep limits for runState locking and elsewhere
1093     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1094     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos

1231             U.putIntOpaque(this, TOP, v);
1232         }
1233         final void updateArray(ForkJoinTask<?>[] a) {
1234             U.getAndSetReference(this, ARRAY, a);
1235         }
1236         final void unlockPhase() {
1237             U.getAndAddInt(this, PHASE, IDLE);
1238         }
1239         final boolean tryLockPhase() {    // seqlock acquire
1240             int p;
1241             return (((p = phase) & IDLE) != 0 &&
1242                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1243         }
1244 
1245         /**
1246          * Constructor. For internal queues, most fields are initialized
1247          * upon thread start in pool.registerWorker.
1248          */
1249         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1250                   boolean clearThreadLocals) {
1251             array = new ForkJoinTask<?>[owner == null ?
1252                                         INITIAL_EXTERNAL_QUEUE_CAPACITY :
1253                                         INITIAL_QUEUE_CAPACITY];
1254             this.owner = owner;
1255             this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;




1256         }
1257 
1258         /**
1259          * Returns an exportable index (used by ForkJoinWorkerThread).
1260          */
1261         final int getPoolIndex() {
1262             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1263         }
1264 
1265         /**
1266          * Returns the approximate number of tasks in the queue.
1267          */
1268         final int queueSize() {
1269             int unused = phase;             // for ordering effect
1270             return Math.max(top - base, 0); // ignore transient negative
1271         }
1272 
1273         /**
1274          * Pushes a task. Called only by owner or if already locked
1275          *
1276          * @param task the task; no-op if null
1277          * @param pool the pool to signal if was previously empty, else null
1278          * @param internal if caller owns this queue
1279          * @throws RejectedExecutionException if array could not be resized
1280          */
1281         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1282             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1283             if ((a = array) != null && (cap = a.length) > 0 && // else disabled
1284                 task != null) {
1285                 int pk = task.noUserHelp() + 1;             // prev slot offset
1286                 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1287                     top = s + 1;
1288                     long pos = slotOffset(m & s);
1289                     if (!internal)
1290                         U.putReference(a, pos, task);       // inside lock
1291                     else
1292                         U.getAndSetReference(a, pos, task); // fully fenced
1293                     if (room == 0)                          // resize
1294                         growArray(a, cap, s);
1295                 }
1296                 if (!internal)
1297                     unlockPhase();
1298                 if (room < 0)
1299                     throw new RejectedExecutionException("Queue capacity exceeded");
1300                 if ((room == 0 || a[m & (s - pk)] == null) &&
1301                     pool != null)
1302                     pool.signalWork();   // may have appeared empty
1303             }
1304         }
1305 
1306         /**
1307          * Resizes the queue array unless out of memory.
1308          * @param a old array
1309          * @param cap old array capacity
1310          * @param s current top

1311          */
1312         private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
1313             int newCap = cap << 1;

1314             if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1315                 ForkJoinTask<?>[] newArray = null;
1316                 try {
1317                     newArray = new ForkJoinTask<?>[newCap];
1318                 } catch (OutOfMemoryError ex) {
1319                 }
1320                 if (newArray != null) {               // else throw on next push
1321                     int mask = cap - 1, newMask = newCap - 1;
1322                     for (int k = s, j = cap; j > 0; --j, --k) {
1323                         ForkJoinTask<?> u;            // poll old, push to new
1324                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1325                                  a, slotOffset(k & mask), null)) == null)
1326                             break;                    // lost to pollers
1327                         newArray[k & newMask] = u;
1328                     }
1329                     updateArray(newArray);           // fully fenced
1330                 }
1331             }

1332         }
1333 
1334         /**
1335          * Takes next task, if one exists, in order specified by mode,
1336          * so acts as either local-pop or local-poll. Called only by owner.
1337          * @param fifo nonzero if FIFO mode
1338          */
1339         private ForkJoinTask<?> nextLocalTask(int fifo) {
1340             ForkJoinTask<?> t = null;
1341             ForkJoinTask<?>[] a = array;
1342             int b = base, p = top, cap;
1343             if (p - b > 0 && a != null && (cap = a.length) > 0) {
1344                 for (int m = cap - 1, s, nb;;) {
1345                     if (fifo == 0 || (nb = b + 1) == p) {
1346                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1347                                  a, slotOffset(m & (s = p - 1)), null)) != null)
1348                             updateTop(s);       // else lost race for only task
1349                         break;
1350                     }
1351                     if ((t = (ForkJoinTask<?>)U.getAndSetReference(

1763         return false;
1764     }
1765 
1766     /**
1767      * Provides a name for ForkJoinWorkerThread constructor.
1768      */
1769     final String nextWorkerThreadName() {
1770         String prefix = workerNamePrefix;
1771         long tid = incrementThreadIds() + 1L;
1772         if (prefix == null) // commonPool has no prefix
1773             prefix = "ForkJoinPool.commonPool-worker-";
1774         return prefix.concat(Long.toString(tid));
1775     }
1776 
1777     /**
1778      * Finishes initializing and records internal queue.
1779      *
1780      * @param w caller's WorkQueue
1781      */
1782     final void registerWorker(WorkQueue w) {
1783         if (w != null && (runState & STOP) == 0L) {

1784             ThreadLocalRandom.localInit();
1785             int seed = w.stackPred = ThreadLocalRandom.getProbe();
1786             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1787             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1788             long stop = lockRunState() & STOP;
1789             try {
1790                 WorkQueue[] qs; int n;
1791                 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1792                     for (int k = n, m = n - 1;  ; id += 2) {
1793                         if (qs[id &= m] == null)
1794                             break;
1795                         if ((k -= 2) <= 0) {
1796                             id |= n;
1797                             break;
1798                         }
1799                     }
1800                     w.phase = id | phaseSeq;    // now publishable
1801                     if (id < n)
1802                         qs[id] = w;
1803                     else {                      // expand

1841             do {} while (c != (c = compareAndExchangeCtl(
1842                                    c, ((RC_MASK & (c - RC_UNIT)) |
1843                                        (TC_MASK & (c - TC_UNIT)) |
1844                                        (LMASK & c)))));
1845         }
1846         if (phase != 0 && w != null) {     // remove index unless terminating
1847             long ns = w.nsteals & 0xffffffffL;
1848             if ((runState & STOP) == 0L) {
1849                 WorkQueue[] qs; int n, i;
1850                 if ((lockRunState() & STOP) == 0L &&
1851                     (qs = queues) != null && (n = qs.length) > 0 &&
1852                     qs[i = phase & SMASK & (n - 1)] == w) {
1853                     qs[i] = null;
1854                     stealCount += ns;      // accumulate steals
1855                 }
1856                 unlockRunState();
1857             }
1858         }
1859         if ((tryTerminate(false, false) & STOP) == 0L &&
1860             phase != 0 && w != null && w.source != DROPPED) {
1861             signalWork();                  // possibly replace
1862             w.cancelTasks();               // clean queue

1863         }
1864         if (ex != null)
1865             ForkJoinTask.rethrow(ex);
1866     }
1867 
1868     /**
1869      * Releases an idle worker, or creates one if not enough exist.

1870      */
1871     final void signalWork() {
1872         int pc = parallelism;
1873         for (long c = ctl;;) {
1874             WorkQueue[] qs = queues;
1875             long ac = (c + RC_UNIT) & RC_MASK, nc;
1876             int sp = (int)c, i = sp & SMASK;
1877             if ((short)(c >>> RC_SHIFT) >= pc)
1878                 break;
1879             if (qs == null)
1880                 break;
1881             if (qs.length <= i)
1882                 break;
1883             WorkQueue w = qs[i], v = null;
1884             if (sp == 0) {
1885                 if ((short)(c >>> TC_SHIFT) >= pc)
1886                     break;
1887                 nc = ((c + TC_UNIT) & TC_MASK);
1888             }
1889             else if ((v = w) == null)
1890                 break;
1891             else
1892                 nc = (v.stackPred & LMASK) | (c & TC_MASK);
1893             if (c == (c = compareAndExchangeCtl(c, nc | ac))) {


1894                 if (v == null)
1895                     createWorker();
1896                 else {
1897                     v.phase = sp;
1898                     if (v.parking != 0)
1899                         U.unpark(v.owner);
1900                 }
1901                 break;
1902             }
1903         }
1904     }
1905 
1906     /**
1907      * Releases all waiting workers. Called only during shutdown.
1908      */
1909     private void releaseWaiters() {
1910         for (long c = ctl;;) {
1911             WorkQueue[] qs; WorkQueue v; int sp, i;
1912             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1913                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1957                 else if ((e & SHUTDOWN) == 0)
1958                     return 0;
1959                 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1960                     return 0;
1961                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1962                     return 1;                             // enable termination
1963                 else
1964                     break;                                // restart
1965             }
1966         }
1967     }
1968 
1969     /**
1970      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1971      * See above for explanation.
1972      *
1973      * @param w caller's WorkQueue (may be null on failed initialization)
1974      */
1975     final void runWorker(WorkQueue w) {
1976         if (w != null) {
1977             int phase = w.phase, r = w.stackPred;     // seed from registerWorker
1978             int fifo = w.config & FIFO, nsteals = 0, src = -1;
1979             for (;;) {
1980                 WorkQueue[] qs;
1981                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1982                 if ((runState & STOP) != 0L || (qs = queues) == null)
1983                     break;
1984                 int n = qs.length, i = r, step = (r >>> 16) | 1;
1985                 boolean rescan = false;
1986                 scan: for (int l = n; l > 0; --l, i += step) {  // scan queues
1987                     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1988                     if ((q = qs[j = i & (n - 1)]) != null &&
1989                         (a = q.array) != null && (cap = a.length) > 0) {
1990                         for (int m = cap - 1, pb = -1, b = q.base;;) {
1991                             ForkJoinTask<?> t; long k;
1992                             t = (ForkJoinTask<?>)U.getReferenceAcquire(
1993                                 a, k = slotOffset(m & b));
1994                             if (b != (b = q.base) || t == null ||
1995                                 !U.compareAndSetReference(a, k, t, null)) {
1996                                 if (a[b & m] == null) {
1997                                     if (rescan)           // end of run
1998                                         break scan;
1999                                     if (a[(b + 1) & m] == null &&
2000                                         a[(b + 2) & m] == null) {
2001                                         break;            // probably empty
2002                                     }
2003                                     if (pb == (pb = b)) { // track progress
2004                                         rescan = true;    // stalled; reorder scan
2005                                         break scan;
2006                                     }
2007                                 }



2008                             }
2009                             else {
2010                                 boolean propagate;
2011                                 int nb = q.base = b + 1, prevSrc = src;
2012                                 w.nsteals = ++nsteals;
2013                                 w.source = src = j;       // volatile
2014                                 rescan = true;
2015                                 int nh = t.noUserHelp();
2016                                 if (propagate =
2017                                     (prevSrc != src || nh != 0) && a[nb & m] != null)
2018                                     signalWork();
2019                                 w.topLevelExec(t, fifo);
2020                                 if ((b = q.base) != nb && !propagate)
2021                                     break scan;          // reduce interference
2022                             }
2023                         }
2024                     }
2025                 }
2026                 if (!rescan) {
2027                     if (((phase = deactivate(w, phase)) & IDLE) != 0)
2028                         break;
2029                     src = -1;                            // re-enable propagation



2030                 }



2031             }
2032         }
2033     }
2034 
2035     /**
2036      * Deactivates and if necessary awaits signal or termination.
2037      *
2038      * @param w the worker
2039      * @param phase current phase
2040      * @return current phase, with IDLE set if worker should exit
2041      */
2042     private int deactivate(WorkQueue w, int phase) {
2043         if (w == null)                        // currently impossible
2044             return IDLE;
2045         int p = phase | IDLE, activePhase = phase + (IDLE << 1);
2046         long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
2047         int sp = w.stackPred = (int)pc;       // set ctl stack link
2048         w.phase = p;
2049         if (!compareAndSetCtl(pc, qc))        // try to enqueue
2050             return w.phase = phase;           // back out on possible signal
2051         int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
2052         if (((e = runState) & STOP) != 0L ||
2053             ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
2054             (qs = queues) == null || (n = qs.length) <= 0)
2055             return IDLE;                      // terminating
2056 
2057         for (int prechecks = Math.min(ac, 2), // reactivation threshold
2058              k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
2059             WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
2060             if (w.phase == activePhase)
2061                 return activePhase;
2062             if (--k < 0)
2063                 return awaitWork(w, p);       // block, drop, or exit
2064             if ((q = qs[k & (n - 1)]) == null)
2065                 Thread.onSpinWait();
2066             else if ((a = q.array) != null && (cap = a.length) > 0 &&
2067                      a[q.base & (cap - 1)] != null && --prechecks < 0 &&
2068                      (int)(c = ctl) == activePhase &&
2069                      compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
2070                 return w.phase = activePhase; // reactivate
2071         }
2072     }
2073 
2074     /**
2075      * Awaits signal or termination.
2076      *
2077      * @param w the work queue
2078      * @param p current phase (known to be idle)
2079      * @return current phase, with IDLE set if worker should exit
2080      */
2081     private int awaitWork(WorkQueue w, int p) {
2082         if (w != null) {
2083             ForkJoinWorkerThread t; long deadline;
2084             if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
2085                 t.resetThreadLocals();          // clear before reactivate
2086             if ((ctl & RC_MASK) > 0L)
2087                 deadline = 0L;
2088             else if ((deadline =
2089                       (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
2090                       System.currentTimeMillis()) == 0L)
2091                 deadline = 1L;                 // avoid zero
2092             int activePhase = p + IDLE;
2093             if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {





























2094                 LockSupport.setCurrentBlocker(this);
2095                 w.parking = 1;                 // enable unpark
2096                 while ((p = w.phase) != activePhase) {
2097                     boolean trimmable = false; int trim;
2098                     Thread.interrupted();      // clear status
2099                     if ((runState & STOP) != 0L)
2100                         break;
2101                     if (deadline != 0L) {
2102                         if ((trim = tryTrim(w, p, deadline)) > 0)
2103                             break;
2104                         else if (trim < 0)
2105                             deadline = 0L;
2106                         else
2107                             trimmable = true;
2108                     }
2109                     U.park(trimmable, deadline);
2110                 }
2111                 w.parking = 0;
2112                 LockSupport.setCurrentBlocker(null);


2113             }
2114         }
2115         return p;
2116     }
2117 
2118     /**
2119      * Tries to remove and deregister worker after timeout, and release
2120      * another to do the same.
2121      * @return > 0: trimmed, < 0 : not trimmable, else 0
2122      */
2123     private int tryTrim(WorkQueue w, int phase, long deadline) {
2124         long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2125         if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
2126             stat = -1;                      // no longer ctl top
2127         else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2128             stat = 0;                       // spurious wakeup
2129         else if (!compareAndSetCtl(
2130                      c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2131                                (TC_MASK & (c - TC_UNIT)))))
2132             stat = -1;                      // lost race to signaller
2133         else {
2134             stat = 1;
2135             w.source = DROPPED;

2544         else
2545             return 0;
2546     }
2547 
2548     /**
2549      * Gets and removes a local or stolen task for the given worker.
2550      *
2551      * @return a task, if available
2552      */
2553     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2554         ForkJoinTask<?> t;
2555         if (w == null || (t = w.nextLocalTask()) == null)
2556             t = pollScan(false);
2557         return t;
2558     }
2559 
2560     // External operations
2561 
2562     /**
2563      * Finds and locks a WorkQueue for an external submitter, or
2564      * throws RejectedExecutionException if shutdown or terminating.
2565      * @param r current ThreadLocalRandom.getProbe() value
2566      * @param rejectOnShutdown true if RejectedExecutionException
2567      *        should be thrown when shutdown (else only if terminating)
2568      */
2569     private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
2570         int reuse;                                   // nonzero if prefer create
2571         if ((reuse = r) == 0) {
2572             ThreadLocalRandom.localInit();           // initialize caller's probe
2573             r = ThreadLocalRandom.getProbe();
2574         }
2575         for (int probes = 0; ; ++probes) {
2576             int n, i, id; WorkQueue[] qs; WorkQueue q;
2577             if ((qs = queues) == null)
2578                 break;
2579             if ((n = qs.length) <= 0)
2580                 break;
2581             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2582                 WorkQueue w = new WorkQueue(null, id, 0, false);
2583                 w.phase = id;
2584                 boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
2585                                   rejectOnShutdown);
2586                 if (!reject && queues == qs && qs[i] == null)
2587                     q = qs[i] = w;                   // else lost race to install
2588                 unlockRunState();
2589                 if (q != null)
2590                     return q;
2591                 if (reject)
2592                     break;
2593                 reuse = 0;
2594             }
2595             if (reuse == 0 || !q.tryLockPhase()) {   // move index
2596                 if (reuse == 0) {
2597                     if (probes >= n >> 1)
2598                         reuse = r;                   // stop prefering free slot
2599                 }
2600                 else if (q != null)
2601                     reuse = 0;                       // probe on collision
2602                 r = ThreadLocalRandom.advanceProbe(r);
2603             }
2604             else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2605                 q.unlockPhase();                     // check while q lock held
2606                 break;
2607             }
2608             else
2609                 return q;


2610         }
2611         throw new RejectedExecutionException();
2612     }
2613 
2614     private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2615         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2616         if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2617             (wt = (ForkJoinWorkerThread)t).pool == this) {
2618             internal = true;
2619             q = wt.workQueue;
2620         }
2621         else {                     // find and lock queue
2622             internal = false;
2623             q = submissionQueue(ThreadLocalRandom.getProbe(), true);
2624         }
2625         q.push(task, signalIfEmpty ? this : null, internal);
2626         return task;
2627     }
2628 
2629     /**
2630      * Returns queue for an external submission, bypassing call to
2631      * submissionQueue if already established and unlocked.
2632      */
2633     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2634         WorkQueue[] qs; WorkQueue q; int n;
2635         int r = ThreadLocalRandom.getProbe();
2636         return (((qs = queues) != null && (n = qs.length) > 0 &&
2637                  (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
2638                  q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
2639     }
2640 
2641     /**
2642      * Returns queue for an external thread, if one exists that has
2643      * possibly ever submitted to the given pool (nonzero probe), or
2644      * null if none.
2645      */
2646     static WorkQueue externalQueue(ForkJoinPool p) {
2647         WorkQueue[] qs; int n;
2648         int r = ThreadLocalRandom.getProbe();
2649         return (p != null && (qs = p.queues) != null &&
2650                 (n = qs.length) > 0 && r != 0) ?
2651             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2652     }
2653 
2654     /**
2655      * Returns external queue for common pool.
2656      */
2657     static WorkQueue commonQueue() {
2658         return externalQueue(common);
2659     }
2660 

 543      * WorkQueue field "phase" encodes the queue array id in lower
 544      * bits, and otherwise acts similarly to the pool runState field:
 545      * The "IDLE" bit is clear while active (either a released worker
 546      * or a locked external queue), with other bits serving as a
 547      * version counter to distinguish changes across multiple reads.
 548      * Note that phase field updates lag queue CAS releases; seeing a
 549      * non-idle phase does not guarantee that the worker is available
 550      * (and so is never checked in this way).
 551      *
 552      * The ctl field also serves as the basis for memory
 553      * synchronization surrounding activation. This uses a more
 554      * efficient version of a Dekker-like rule that task producers and
 555      * consumers sync with each other by both writing/CASing ctl (even
 556      * if to its current value).  However, rather than CASing ctl to
 557      * its current value in the common case where no action is
 558      * required, we reduce write contention by ensuring that
 559      * signalWork invocations are prefaced with a fully fenced memory
 560      * access (which is usually needed anyway).
 561      *
 562      * Signalling. Signals (in signalWork) cause new or reactivated
 563      * workers to scan for tasks.  SignalWork is invoked in two cases:
 564      * (1) When a task is pushed onto an empty queue, and (2) When a
 565      * worker takes a top-level task from a queue that has additional
 566      * tasks. Together, these suffice in O(log(#threads)) steps to
 567      * fully activate with at least enough workers, and ideally no
 568      * more than required.  This ideal is unobtainable: Callers do not
 569      * know whether another worker will finish its current task and
 570      * poll for others without need of a signal (which is otherwise an
 571      * advantage of work-stealing vs other schemes), and also must
 572      * conservatively estimate the triggering conditions of emptiness
 573      * or non-emptiness; all of which usually cause more activations
 574      * than necessary (see below). (Method signalWork is also used as
 575      * failsafe in case of Thread failures in deregisterWorker.)
 576      *
 577      * Top-Level scheduling
 578      * ====================




































 579      *
 580      * Scanning. Method runWorker performs top-level scanning for (and
 581      * execution of) tasks by polling a pseudo-random permutation of
 582      * the array (by starting at a given index, and using a constant
 583      * cyclically exhaustive stride.)  It uses the same basic polling
 584      * method as WorkQueue.poll(), but restarts with a different
 585      * permutation on each rescan.  The pseudorandom generator need
 586      * not have high-quality statistical properties in the long
 587      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 588      * from ThreadLocalRandom probes, which are cheap and suffice.











 589      *
 590      * Deactivation. When no tasks are found by a worker in runWorker,
 591      * it invokes awaitWork, that first deactivates (to an IDLE
 592      * phase).  Avoiding missed signals during deactivation requires a
 593      * (conservative) rescan, reactivating if there may be tasks to
 594      * poll. Because idle workers are often not yet blocked (parked),
 595      * we use a WorkQueue field to advertise that a waiter actually
 596      * needs unparking upon signal.
 597      *
 598      * When tasks are constructed as (recursive) dags, top-level
 599      * scanning is usually infrequent, and doesn't encounter most
 600      * of the following problems addressed by runWorker and awaitWork:
 601      *
 602      * Locality. Polls are organized into "runs", continuing until
 603      * empty or contended, while also minimizing interference by
 604      * postponing bookeeping to ends of runs. This may reduce
 605      * fairness, which is partially counteracted by the following.
 606      *
 607      * Contention. When many workers try to poll few queues, they
 608      * often collide, generating CAS failures and disrupting locality
 609      * of workers already running their tasks. This also leads to
 610      * stalls when tasks cannot be taken because other workers have
 611      * not finished poll operations, which is detected by reading
 612      * ahead in queue arrays. In both caes, workers restart scans in a
 613      * way that approximates randomized backoff.
 614      *
 615      * Oversignalling. When many short top-level tasks are present in
 616      * a small number of queues, the above signalling strategy may
 617      * activate many more workers than needed, worsening locality and
 618      * contention problems, while also generating more global
 619      * contention (field is CASed on every activation and
 620      * deactivation). We filter out (both in runWorker and
 621      * signalWork) attempted signals that are surely not needed
 622      * because the signalled tasks are already taken.
 623      *
 624      * Shutdown and Quiescence
 625      * =======================
 626      *
 627      * Quiescence. Workers scan looking for work, giving up when they
 628      * don't find any, without being sure that none are available.
 629      * However, some required functionality relies on consensus about
 630      * quiescence (also termination, discussed below). The count
 631      * fields in ctl allow accurate discovery of states in which all
 632      * workers are idle.  However, because external (asynchronous)
 633      * submitters are not part of this vote, these mechanisms
 634      * themselves do not guarantee that the pool is in a quiescent
 635      * state with respect to methods isQuiescent, shutdown (which
 636      * begins termination when quiescent), helpQuiesce, and indirectly
 637      * others including tryCompensate. Method quiescent() is used in
 638      * all of these contexts. It provides checks that all workers are
 639      * idle and there are no submissions that they could poll if they
 640      * were not idle, retrying on inconsistent reads of queues and
 641      * using the runState seqLock to retry on queue array updates.
 642      * (It also reports quiescence if the pool is terminating.) A true
 643      * report means only that there was a moment at which quiescence
 644      * held.  False negatives are inevitable (for example when queues
 645      * indices lag updates, as described above), which is accommodated

 855      * ====================
 856      *
 857      * Regular ForkJoinTasks manage task cancellation (method cancel)
 858      * independently from the interrupt status of threads running
 859      * tasks.  Interrupts are issued internally only while
 860      * terminating, to wake up workers and cancel queued tasks.  By
 861      * default, interrupts are cleared only when necessary to ensure
 862      * that calls to LockSupport.park do not loop indefinitely (park
 863      * returns immediately if the current thread is interrupted).
 864      *
 865      * To comply with ExecutorService specs, we use subclasses of
 866      * abstract class InterruptibleTask for tasks that require
 867      * stronger interruption and cancellation guarantees.  External
 868      * submitters never run these tasks, even if in the common pool
 869      * (as indicated by ForkJoinTask.noUserHelp status bit).
 870      * InterruptibleTasks include a "runner" field (implemented
 871      * similarly to FutureTask) to support cancel(true).  Upon pool
 872      * shutdown, runners are interrupted so they can cancel. Since
 873      * external joining callers never run these tasks, they must await
 874      * cancellation by others, which can occur along several different
 875      * paths.


 876      *
 877      * Across these APIs, rules for reporting exceptions for tasks
 878      * with results accessed via join() differ from those via get(),
 879      * which differ from those invoked using pool submit methods by
 880      * non-workers (which comply with Future.get() specs). Internal
 881      * usages of ForkJoinTasks ignore interrupt status when executing
 882      * or awaiting completion.  Otherwise, reporting task results or
 883      * exceptions is preferred to throwing InterruptedExceptions,
 884      * which are in turn preferred to timeouts. Similarly, completion
 885      * status is preferred to reporting cancellation.  Cancellation is
 886      * reported as an unchecked exception by join(), and by worker
 887      * calls to get(), but is otherwise wrapped in a (checked)
 888      * ExecutionException.
 889      *
 890      * Worker Threads cannot be VirtualThreads, as enforced by
 891      * requiring ForkJoinWorkerThreads in factories.  There are
 892      * several constructions relying on this.  However as of this
 893      * writing, virtual thread bodies are by default run as some form
 894      * of InterruptibleTask.
 895      *

 933      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 934      * most false-sharing misses with respect to other fields. Also,
 935      * ForkJoinPool fields are ordered such that fields less prone to
 936      * contention effects are first, offsetting those that otherwise
 937      * would be, while also reducing total footprint vs using
 938      * multiple @Contended regions, which tends to slow down
 939      * less-contended applications. To help arrange this, some
 940      * non-reference fields are declared as "long" even when ints or
 941      * shorts would suffice.  For class WorkQueue, an
 942      * embedded @Contended region segregates fields most heavily
 943      * updated by owners from those most commonly read by stealers or
 944      * other management.
 945      *
 946      * Initial sizing and resizing of WorkQueue arrays is an even more
 947      * delicate tradeoff because the best strategy systematically
 948      * varies across garbage collectors. Small arrays are better for
 949      * locality and reduce GC scan time, but large arrays reduce both
 950      * direct false-sharing and indirect cases due to GC bookkeeping
 951      * (cardmarks etc), and reduce the number of resizes, which are
 952      * not especially fast because they require atomic transfers.
 953      * Currently, arrays are initialized to be just large enough to
 954      * avoid resizing in most tree-structured tasks, but grow rapidly
 955      * until large.  (Maintenance note: any changes in fields, queues,
 956      * or their uses, or JVM layout policies, must be accompanied by
 957      * re-evaluation of these placement and sizing decisions.)


 958      *
 959      * Style notes
 960      * ===========
 961      *
 962      * Memory ordering relies mainly on atomic operations (CAS,
 963      * getAndSet, getAndAdd) along with moded accesses. These use
 964      * jdk-internal Unsafe for atomics and special memory modes,
 965      * rather than VarHandles, to avoid initialization dependencies in
 966      * other jdk components that require early parallelism.  This can
 967      * be awkward and ugly, but also reflects the need to control
 968      * outcomes across the unusual cases that arise in very racy code
 969      * with very few invariants. All atomic task slot updates use
 970      * Unsafe operations requiring offset positions, not indices, as
 971      * computed by method slotOffset. All fields are read into locals
 972      * before use, and null-checked if they are references, even if
 973      * they can never be null under current usages. Usually,
 974      * computations (held in local variables) are defined as soon as
 975      * logically enabled, sometimes to convince compilers that they
 976      * may be performed despite memory ordering constraints.  Array
 977      * accesses using masked indices include checks (that are always

1020      */
1021     static final long DEFAULT_KEEPALIVE = 60_000L;
1022 
1023     /**
1024      * Undershoot tolerance for idle timeouts, also serving as the
1025      * minimum allowed timeout value.
1026      */
1027     static final long TIMEOUT_SLOP = 20L;
1028 
1029     /**
1030      * The default value for common pool maxSpares.  Overridable using
1031      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1032      * system property.  The default value is far in excess of normal
1033      * requirements, but also far short of maximum capacity and typical OS
1034      * thread limits, so allows JVMs to catch misuse/abuse before
1035      * running out of resources needed to do so.
1036      */
1037     static final int DEFAULT_COMMON_MAX_SPARES = 256;
1038 
1039     /**
1040      * Initial capacity of work-stealing queue array.
1041      * Must be a power of two, at least 2. See above.
1042      */
1043     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1044 






1045     // conversions among short, int, long
1046     static final int  SMASK           = 0xffff;      // (unsigned) short bits
1047     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
1048     static final long UMASK           = ~LMASK;      // upper 32 bits
1049 
1050     // masks and sentinels for queue indices
1051     static final int MAX_CAP          = 0x7fff;   // max # workers
1052     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1053     static final int INVALID_ID       = 0x4000;   // unused external queue id
1054 
1055     // pool.runState bits
1056     static final long STOP            = 1L <<  0;   // terminating
1057     static final long SHUTDOWN        = 1L <<  1;   // terminate when quiescent
1058     static final long CLEANED         = 1L <<  2;   // stopped and queues cleared
1059     static final long TERMINATED      = 1L <<  3;   // only set if STOP also set
1060     static final long RS_LOCK         = 1L <<  4;   // lowest seqlock bit
1061 
1062     // spin/sleep limits for runState locking and elsewhere
1063     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1064     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos

1201             U.putIntOpaque(this, TOP, v);
1202         }
1203         final void updateArray(ForkJoinTask<?>[] a) {
1204             U.getAndSetReference(this, ARRAY, a);
1205         }
1206         final void unlockPhase() {
1207             U.getAndAddInt(this, PHASE, IDLE);
1208         }
1209         final boolean tryLockPhase() {    // seqlock acquire
1210             int p;
1211             return (((p = phase) & IDLE) != 0 &&
1212                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1213         }
1214 
1215         /**
1216          * Constructor. For internal queues, most fields are initialized
1217          * upon thread start in pool.registerWorker.
1218          */
1219         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1220                   boolean clearThreadLocals) {




1221             this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1222             if ((this.owner = owner) == null) {
1223                 phase = id | IDLE;
1224                 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1225             }
1226         }
1227 
1228         /**
1229          * Returns an exportable index (used by ForkJoinWorkerThread).
1230          */
1231         final int getPoolIndex() {
1232             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1233         }
1234 
1235         /**
1236          * Returns the approximate number of tasks in the queue.
1237          */
1238         final int queueSize() {
1239             int unused = phase;             // for ordering effect
1240             return Math.max(top - base, 0); // ignore transient negative
1241         }
1242 
1243         /**
1244          * Pushes a task. Called only by owner or if already locked
1245          *
1246          * @param task the task; no-op if null
1247          * @param pool the pool to signal if was previously empty, else null
1248          * @param internal if caller owns this queue
1249          * @throws RejectedExecutionException if array could not be resized
1250          */
1251         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1252             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1253             if ((a = array) != null && (cap = a.length) > 0) { // else disabled


1254                 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1255                     top = s + 1;
1256                     long pos = slotOffset(m & s);
1257                     if (!internal)
1258                         U.putReference(a, pos, task);       // inside lock
1259                     else
1260                         U.getAndSetReference(a, pos, task); // fully fenced
1261                     if (room == 0 && (a = growArray(a, cap, s)) != null)
1262                         m = a.length - 1;                   // resize
1263                 }
1264                 if (!internal)
1265                     unlockPhase();
1266                 if (room < 0)
1267                     throw new RejectedExecutionException("Queue capacity exceeded");
1268                 if (pool != null && a != null &&
1269                     U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null)
1270                     pool.signalWork(a, m & s);   // may have appeared empty
1271             }
1272         }
1273 
1274         /**
1275          * Resizes the queue array unless out of memory.
1276          * @param a old array
1277          * @param cap old array capacity
1278          * @param s current top
1279          * @return new array, or null on failure
1280          */
1281         private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
1282             int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
1283             ForkJoinTask<?>[] newArray = null;
1284             if (a != null && a.length == cap && cap > 0 && newCap > 0) {

1285                 try {
1286                     newArray = new ForkJoinTask<?>[newCap];
1287                 } catch (OutOfMemoryError ex) {
1288                 }
1289                 if (newArray != null) {               // else throw on next push
1290                     int mask = cap - 1, newMask = newCap - 1;
1291                     for (int k = s, j = cap; j > 0; --j, --k) {
1292                         ForkJoinTask<?> u;            // poll old, push to new
1293                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1294                                  a, slotOffset(k & mask), null)) == null)
1295                             break;                    // lost to pollers
1296                         newArray[k & newMask] = u;
1297                     }
1298                     updateArray(newArray);           // fully fenced
1299                 }
1300             }
1301             return newArray;
1302         }
1303 
1304         /**
1305          * Takes next task, if one exists, in order specified by mode,
1306          * so acts as either local-pop or local-poll. Called only by owner.
1307          * @param fifo nonzero if FIFO mode
1308          */
1309         private ForkJoinTask<?> nextLocalTask(int fifo) {
1310             ForkJoinTask<?> t = null;
1311             ForkJoinTask<?>[] a = array;
1312             int b = base, p = top, cap;
1313             if (p - b > 0 && a != null && (cap = a.length) > 0) {
1314                 for (int m = cap - 1, s, nb;;) {
1315                     if (fifo == 0 || (nb = b + 1) == p) {
1316                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1317                                  a, slotOffset(m & (s = p - 1)), null)) != null)
1318                             updateTop(s);       // else lost race for only task
1319                         break;
1320                     }
1321                     if ((t = (ForkJoinTask<?>)U.getAndSetReference(

1733         return false;
1734     }
1735 
1736     /**
1737      * Provides a name for ForkJoinWorkerThread constructor.
1738      */
1739     final String nextWorkerThreadName() {
1740         String prefix = workerNamePrefix;
1741         long tid = incrementThreadIds() + 1L;
1742         if (prefix == null) // commonPool has no prefix
1743             prefix = "ForkJoinPool.commonPool-worker-";
1744         return prefix.concat(Long.toString(tid));
1745     }
1746 
1747     /**
1748      * Finishes initializing and records internal queue.
1749      *
1750      * @param w caller's WorkQueue
1751      */
1752     final void registerWorker(WorkQueue w) {
1753         if (w != null) {
1754             w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1755             ThreadLocalRandom.localInit();
1756             int seed = w.stackPred = ThreadLocalRandom.getProbe();
1757             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1758             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1759             long stop = lockRunState() & STOP;
1760             try {
1761                 WorkQueue[] qs; int n;
1762                 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1763                     for (int k = n, m = n - 1;  ; id += 2) {
1764                         if (qs[id &= m] == null)
1765                             break;
1766                         if ((k -= 2) <= 0) {
1767                             id |= n;
1768                             break;
1769                         }
1770                     }
1771                     w.phase = id | phaseSeq;    // now publishable
1772                     if (id < n)
1773                         qs[id] = w;
1774                     else {                      // expand

1812             do {} while (c != (c = compareAndExchangeCtl(
1813                                    c, ((RC_MASK & (c - RC_UNIT)) |
1814                                        (TC_MASK & (c - TC_UNIT)) |
1815                                        (LMASK & c)))));
1816         }
1817         if (phase != 0 && w != null) {     // remove index unless terminating
1818             long ns = w.nsteals & 0xffffffffL;
1819             if ((runState & STOP) == 0L) {
1820                 WorkQueue[] qs; int n, i;
1821                 if ((lockRunState() & STOP) == 0L &&
1822                     (qs = queues) != null && (n = qs.length) > 0 &&
1823                     qs[i = phase & SMASK & (n - 1)] == w) {
1824                     qs[i] = null;
1825                     stealCount += ns;      // accumulate steals
1826                 }
1827                 unlockRunState();
1828             }
1829         }
1830         if ((tryTerminate(false, false) & STOP) == 0L &&
1831             phase != 0 && w != null && w.source != DROPPED) {

1832             w.cancelTasks();               // clean queue
1833             signalWork(null, 0);           // possibly replace
1834         }
1835         if (ex != null)
1836             ForkJoinTask.rethrow(ex);
1837     }
1838 
1839     /**
1840      * Releases an idle worker, or creates one if not enough exist,
1841      * giving up if array a is nonnull and task at a[k] already taken.
1842      */
1843     final void signalWork(ForkJoinTask<?>[] a, int k) {
1844         int pc = parallelism;
1845         for (long c = ctl;;) {
1846             WorkQueue[] qs = queues;
1847             long ac = (c + RC_UNIT) & RC_MASK, nc;
1848             int sp = (int)c, i = sp & SMASK;
1849             if ((short)(c >>> RC_SHIFT) >= pc)
1850                 break;
1851             if (qs == null)
1852                 break;
1853             if (qs.length <= i)
1854                 break;
1855             WorkQueue w = qs[i], v = null;
1856             if (sp == 0) {
1857                 if ((short)(c >>> TC_SHIFT) >= pc)
1858                     break;
1859                 nc = ((c + TC_UNIT) & TC_MASK) | ac;
1860             }
1861             else if ((v = w) == null)
1862                 break;
1863             else
1864                 nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
1865             if (a != null && k < a.length && k >= 0 && a[k] == null)
1866                 break;
1867             if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
1868                 if (v == null)
1869                     createWorker();
1870                 else {
1871                     v.phase = sp;
1872                     if (v.parking != 0)
1873                         U.unpark(v.owner);
1874                 }
1875                 break;
1876             }
1877         }
1878     }
1879 
1880     /**
1881      * Releases all waiting workers. Called only during shutdown.
1882      */
1883     private void releaseWaiters() {
1884         for (long c = ctl;;) {
1885             WorkQueue[] qs; WorkQueue v; int sp, i;
1886             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1887                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1931                 else if ((e & SHUTDOWN) == 0)
1932                     return 0;
1933                 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1934                     return 0;
1935                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1936                     return 1;                             // enable termination
1937                 else
1938                     break;                                // restart
1939             }
1940         }
1941     }
1942 
1943     /**
1944      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1945      * See above for explanation.
1946      *
1947      * @param w caller's WorkQueue (may be null on failed initialization)
1948      */
1949     final void runWorker(WorkQueue w) {
1950         if (w != null) {
1951             int phase = w.phase;
1952             int r = w.stackPred, origin = r;              // seed from registerWorker
1953             int cfg = w.config, fifo = cfg & FIFO, clearLocals = cfg & CLEAR_TLS;
1954             int src = -1;                                 // current source queue
1955             int taken = 0, ptaken = 0, staken = 0;        // takes per phase and scan
1956             rescan: while ((runState & STOP) == 0L) {
1957                 WorkQueue[] qs = queues;
1958                 int n = (qs == null) ? 0 : qs.length;
1959                 int i = origin, step = (r >>> 16) | 1;
1960                 r ^= r << 13; r ^= r >>> 17; origin = r ^= r << 5; // xorshift
1961                 for (int l = n; l > 0; --l, i += step) {  // scan queues
1962                     WorkQueue q; int j;
1963                     if ((q = qs[j = i & (n - 1)]) != null) {
1964                         for (;;) {                        // poll q
1965                             ForkJoinTask<?>[] a; int cap, b, m, k;
1966                             if ((a = q.array) == null || (cap = a.length) <= 0)
1967                                 break;
1968                             long bp = slotOffset(k = (b = q.base) & (m = cap - 1));
1969                             int nb = b + 1, nk = nb & m;
1970                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1971                                 U.getReferenceAcquire(a, bp);
1972                             if (q.array != a || q.base != b || a[k] != t)
1973                                 continue;                 // inconsistent
1974                             if (t == null) {
1975                                 if (taken != staken) {
1976                                     staken = taken;
1977                                     continue rescan;      // sweep until clean



1978                                 }
1979                                 if (a[nk] != null || a[(b + 2) & m] != null)
1980                                     continue rescan;      // stalled; reorder scan
1981                                 break;                    // probably empty
1982                             }
1983                             if (U.compareAndSetReference(a, bp, t, null)) {
1984                                 q.base = nb;
1985                                 Object nt = U.getReferenceAcquire
1986                                     (a, slotOffset(nk));  // confirm below
1987                                 ++taken;
1988                                 if (src != j)
1989                                     w.source = src = j;
1990                                 if (nt != null && nt == a[nk])
1991                                     signalWork(a, nk);    // propagate
1992                                 w.topLevelExec(t, fifo);  // run t & its subtasks



1993                             }
1994                         }
1995                     }
1996                 }
1997                 if (taken != ptaken) {                    // end run
1998                     ptaken = taken;
1999                     origin = src;                         // hint for next run
2000                     if (clearLocals != 0 &&
2001                         Thread.currentThread() instanceof ForkJoinWorkerThread wt)
2002                         wt.resetThreadLocals();
2003                     w.nsteals = taken;
2004                 }
2005                 w.phase = phase += IDLE;                  // deactivate
2006                 if ((phase = awaitWork(w, phase)) == IDLE)
2007                     break;
2008             }
2009         }
2010     }
2011 







































2012     /**
2013      * Awaits signal or termination.
2014      *
2015      * @param w the work queue
2016      * @param p current phase (known to be idle
2017      * @return current phase or IDLE if worker should exit
2018      */
2019     private int awaitWork(WorkQueue w, int p) {
2020         if (w == null)                        // never true; hoist checks
2021             return IDLE;
2022         int activePhase = p + IDLE;
2023         long ap = activePhase & LMASK, pc = ctl, qc;
2024         do {                                  // enqueue
2025             qc = ap | ((pc - RC_UNIT) & UMASK);
2026             w.stackPred = (int)pc;            // set ctl stack link
2027         } while (pc != (pc = compareAndExchangeCtl(pc, qc)));
2028         long psp = pc & LMASK;                // reactivation stack prefix
2029         WorkQueue[] qs; int n;                // missed signal check
2030         if ((runState & STOP) != 0 || (qs = queues) == null || (n = qs.length) <= 0)
2031             return IDLE;                      // already terminating
2032         for (int m = n - 1, origin = p + 1, i = 0; i < m; ++i) {
2033             WorkQueue q; long cc;             // stagger origins
2034             if ((q = qs[(origin + i) & m]) != null && q.top - q.base > 0) {
2035                 if ((p = w.phase) == activePhase)
2036                     break;
2037                 if ((int)(cc = ctl) == activePhase &&
2038                     compareAndSetCtl(cc, psp | ((cc + RC_UNIT) & UMASK))) {
2039                     p = w.phase = activePhase;
2040                     break;                    // reactivated
2041                 }
2042             }
2043         }
2044         if (p != activePhase && (p = w.phase) != activePhase) {
2045             long deadline = 0L, c, e;         // quiescence checks
2046             if (((e = runState) & STOP) != 0)
2047                 return IDLE;
2048             else if ((int)(c = ctl) != activePhase || (c & RC_MASK) > 0L) {
2049                 for (int spins = n; (p = w.phase) != activePhase && --spins > 0;)
2050                     Thread.onSpinWait();      // spin unless possibly quiescent
2051             }
2052             else if ((e & SHUTDOWN) != 0L && quiescent() > 0)
2053                 return IDLE;                  // quiescent termination
2054             else {                            // use trim timeout
2055                 long d = ((w.source != INVALID_ID) ? keepAlive :
2056                           TIMEOUT_SLOP) + System.currentTimeMillis();
2057                 deadline = (d == 0L)? 1L : d; // avoid zero
2058                 p = w.phase;
2059             }
2060             if (p != activePhase) {           // block
2061                 LockSupport.setCurrentBlocker(this);
2062                 w.parking = 1;                // enable unpark
2063                 while ((p = w.phase) != activePhase) {
2064                     boolean trimmable = false; int trim;
2065                     Thread.interrupted();     // clear status
2066                     if ((runState & STOP) != 0L)
2067                         break;
2068                     if (deadline != 0L) {
2069                         if ((trim = tryTrim(w, p, deadline)) > 0)
2070                             break;
2071                         else if (trim < 0)
2072                             deadline = 0L;
2073                         else
2074                             trimmable = true;
2075                     }
2076                     U.park(trimmable, deadline);
2077                 }
2078                 w.parking = 0;
2079                 LockSupport.setCurrentBlocker(null);
2080                 if (p != activePhase)
2081                     return IDLE;
2082             }
2083         }
2084         return activePhase;
2085     }
2086 
2087     /**
2088      * Tries to remove and deregister worker after timeout, and release
2089      * another to do the same.
2090      * @return > 0: trimmed, < 0 : not trimmable, else 0
2091      */
2092     private int tryTrim(WorkQueue w, int phase, long deadline) {
2093         long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2094         if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
2095             stat = -1;                      // no longer ctl top
2096         else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2097             stat = 0;                       // spurious wakeup
2098         else if (!compareAndSetCtl(
2099                      c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2100                                (TC_MASK & (c - TC_UNIT)))))
2101             stat = -1;                      // lost race to signaller
2102         else {
2103             stat = 1;
2104             w.source = DROPPED;

2513         else
2514             return 0;
2515     }
2516 
2517     /**
2518      * Gets and removes a local or stolen task for the given worker.
2519      *
2520      * @return a task, if available
2521      */
2522     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2523         ForkJoinTask<?> t;
2524         if (w == null || (t = w.nextLocalTask()) == null)
2525             t = pollScan(false);
2526         return t;
2527     }
2528 
2529     // External operations
2530 
2531     /**
2532      * Finds and locks a WorkQueue for an external submitter, or
2533      * throws RejectedExecutionException if shutdown

2534      * @param rejectOnShutdown true if RejectedExecutionException
2535      *        should be thrown when shutdown
2536      */
2537     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2538         int r;
2539         if ((r = ThreadLocalRandom.getProbe()) == 0) {
2540             ThreadLocalRandom.localInit();   // initialize caller's probe
2541             r = ThreadLocalRandom.getProbe();
2542         }
2543         for (;;) {
2544             WorkQueue q; WorkQueue[] qs; int n, id, i;
2545             if ((qs = queues) == null || (n = qs.length) <= 0)


2546                 break;
2547             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2548                 WorkQueue newq = new WorkQueue(null, id, 0, false);
2549                 lockRunState();
2550                 if (qs[i] == null && queues == qs)
2551                     q = qs[i] = newq;         // else lost race to install


2552                 unlockRunState();





2553             }
2554             if (q != null && q.tryLockPhase()) {
2555                 if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2556                     q.unlockPhase();          // check while q lock held
2557                     break;
2558                 }









2559                 return q;
2560             }
2561             r = ThreadLocalRandom.advanceProbe(r); // move
2562         }
2563         throw new RejectedExecutionException();
2564     }
2565 
2566     private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2567         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2568         if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2569             (wt = (ForkJoinWorkerThread)t).pool == this) {
2570             internal = true;
2571             q = wt.workQueue;
2572         }
2573         else {                     // find and lock queue
2574             internal = false;
2575             q = externalSubmissionQueue(true);
2576         }
2577         q.push(task, signalIfEmpty ? this : null, internal);
2578         return task;
2579     }
2580 












2581     /**
2582      * Returns queue for an external thread, if one exists that has
2583      * possibly ever submitted to the given pool (nonzero probe), or
2584      * null if none.
2585      */
2586     static WorkQueue externalQueue(ForkJoinPool p) {
2587         WorkQueue[] qs; int n;
2588         int r = ThreadLocalRandom.getProbe();
2589         return (p != null && (qs = p.queues) != null &&
2590                 (n = qs.length) > 0 && r != 0) ?
2591             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2592     }
2593 
2594     /**
2595      * Returns external queue for common pool.
2596      */
2597     static WorkQueue commonQueue() {
2598         return externalQueue(common);
2599     }
2600 
< prev index next >