543 * WorkQueue field "phase" encodes the queue array id in lower
544 * bits, and otherwise acts similarly to the pool runState field:
545 * The "IDLE" bit is clear while active (either a released worker
546 * or a locked external queue), with other bits serving as a
547 * version counter to distinguish changes across multiple reads.
548 * Note that phase field updates lag queue CAS releases; seeing a
549 * non-idle phase does not guarantee that the worker is available
550 * (and so is never checked in this way).
551 *
552 * The ctl field also serves as the basis for memory
553 * synchronization surrounding activation. This uses a more
554 * efficient version of a Dekker-like rule that task producers and
555 * consumers sync with each other by both writing/CASing ctl (even
556 * if to its current value). However, rather than CASing ctl to
557 * its current value in the common case where no action is
558 * required, we reduce write contention by ensuring that
559 * signalWork invocations are prefaced with a fully fenced memory
560 * access (which is usually needed anyway).
561 *
562 * Signalling. Signals (in signalWork) cause new or reactivated
563 * workers to scan for tasks. Method signalWork and its callers
564 * try to approximate the unattainable goal of having the right
565 * number of workers activated for the tasks at hand, but must err
566 * on the side of too many workers vs too few to avoid stalls:
567 *
568 * * If computations are purely tree structured, it suffices for
569 * every worker to activate another when it pushes a task into
570 * an empty queue, resulting in O(log(#threads)) steps to full
571 * activation. Emptiness must be conservatively approximated,
572 * which may result in unnecessary signals. Also, to reduce
573 * resource usages in some cases, at the expense of slower
574 * startup in others, activation of an idle thread is preferred
575 * over creating a new one, here and elsewhere.
576 *
577 * * At the other extreme, if "flat" tasks (those that do not in
578 * turn generate others) come in serially from only a single
579 * producer, each worker taking a task from a queue should
580 * propagate a signal if there are more tasks in that
581 * queue. This is equivalent to, but generally faster than,
582 * arranging the stealer take multiple tasks, re-pushing one or
583 * more on its own queue, and signalling (because its queue is
584 * empty), also resulting in logarithmic full activation
585 * time. If tasks do not not engage in unbounded loops based on
586 * the actions of other workers with unknown dependencies loop,
587 * this form of proagation can be limited to one signal per
588 * activation (phase change). We distinguish the cases by
589 * further signalling only if the task is an InterruptibleTask
590 * (see below), which are the only supported forms of task that
591 * may do so.
592 *
593 * * Because we don't know about usage patterns (or most commonly,
594 * mixtures), we use both approaches, which present even more
595 * opportunities to over-signal. (Failure to distinguish these
596 * cases in terms of submission methods was arguably an early
597 * design mistake.) Note that in either of these contexts,
598 * signals may be (and often are) unnecessary because active
599 * workers continue scanning after running tasks without the
600 * need to be signalled (which is one reason work stealing is
601 * often faster than alternatives), so additional workers
602 * aren't needed.
603 *
604 * * For rapidly branching tasks that require full pool resources,
605 * oversignalling is OK, because signalWork will soon have no
606 * more workers to create or reactivate. But for others (mainly
607 * externally submitted tasks), overprovisioning may cause very
608 * noticeable slowdowns due to contention and resource
609 * wastage. We reduce impact by deactivating workers when
610 * queues don't have accessible tasks, but reactivating and
611 * rescanning if other tasks remain.
612 *
613 * * Despite these, signal contention and overhead effects still
614 * occur during ramp-up and ramp-down of small computations.
615 *
616 * Scanning. Method runWorker performs top-level scanning for (and
617 * execution of) tasks by polling a pseudo-random permutation of
618 * the array (by starting at a given index, and using a constant
619 * cyclically exhaustive stride.) It uses the same basic polling
620 * method as WorkQueue.poll(), but restarts with a different
621 * permutation on each invocation. The pseudorandom generator
622 * need not have high-quality statistical properties in the long
623 * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
624 * from ThreadLocalRandom probes, which are cheap and
625 * suffice. Each queue's polling attempts to avoid becoming stuck
626 * when other scanners/pollers stall. Scans do not otherwise
627 * explicitly take into account core affinities, loads, cache
628 * localities, etc, However, they do exploit temporal locality
629 * (which usually approximates these) by preferring to re-poll
630 * from the same queue after a successful poll before trying
631 * others, which also reduces bookkeeping, cache traffic, and
632 * scanning overhead. But it also reduces fairness, which is
633 * partially counteracted by giving up on detected interference
634 * (which also reduces contention when too many workers try to
635 * take small tasks from the same queue).
636 *
637 * Deactivation. When no tasks are found by a worker in runWorker,
638 * it tries to deactivate()), giving up (and rescanning) on "ctl"
639 * contention. To avoid missed signals during deactivation, the
640 * method rescans and reactivates if there may have been a missed
641 * signal during deactivation. To reduce false-alarm reactivations
642 * while doing so, we scan multiple times (analogously to method
643 * quiescent()) before trying to reactivate. Because idle workers
644 * are often not yet blocked (parked), we use a WorkQueue field to
645 * advertise that a waiter actually needs unparking upon signal.
646 *
647 * Quiescence. Workers scan looking for work, giving up when they
648 * don't find any, without being sure that none are available.
649 * However, some required functionality relies on consensus about
650 * quiescence (also termination, discussed below). The count
651 * fields in ctl allow accurate discovery of states in which all
652 * workers are idle. However, because external (asynchronous)
653 * submitters are not part of this vote, these mechanisms
654 * themselves do not guarantee that the pool is in a quiescent
655 * state with respect to methods isQuiescent, shutdown (which
656 * begins termination when quiescent), helpQuiesce, and indirectly
657 * others including tryCompensate. Method quiescent() is used in
658 * all of these contexts. It provides checks that all workers are
659 * idle and there are no submissions that they could poll if they
660 * were not idle, retrying on inconsistent reads of queues and
661 * using the runState seqLock to retry on queue array updates.
662 * (It also reports quiescence if the pool is terminating.) A true
663 * report means only that there was a moment at which quiescence
664 * held. False negatives are inevitable (for example when queues
665 * indices lag updates, as described above), which is accommodated
875 * ====================
876 *
877 * Regular ForkJoinTasks manage task cancellation (method cancel)
878 * independently from the interrupt status of threads running
879 * tasks. Interrupts are issued internally only while
880 * terminating, to wake up workers and cancel queued tasks. By
881 * default, interrupts are cleared only when necessary to ensure
882 * that calls to LockSupport.park do not loop indefinitely (park
883 * returns immediately if the current thread is interrupted).
884 *
885 * To comply with ExecutorService specs, we use subclasses of
886 * abstract class InterruptibleTask for tasks that require
887 * stronger interruption and cancellation guarantees. External
888 * submitters never run these tasks, even if in the common pool
889 * (as indicated by ForkJoinTask.noUserHelp status bit).
890 * InterruptibleTasks include a "runner" field (implemented
891 * similarly to FutureTask) to support cancel(true). Upon pool
892 * shutdown, runners are interrupted so they can cancel. Since
893 * external joining callers never run these tasks, they must await
894 * cancellation by others, which can occur along several different
895 * paths. The inability to rely on caller-runs may also require
896 * extra signalling (resulting in scanning and contention) so is
897 * done only conditionally in methods push and runworker.
898 *
899 * Across these APIs, rules for reporting exceptions for tasks
900 * with results accessed via join() differ from those via get(),
901 * which differ from those invoked using pool submit methods by
902 * non-workers (which comply with Future.get() specs). Internal
903 * usages of ForkJoinTasks ignore interrupt status when executing
904 * or awaiting completion. Otherwise, reporting task results or
905 * exceptions is preferred to throwing InterruptedExceptions,
906 * which are in turn preferred to timeouts. Similarly, completion
907 * status is preferred to reporting cancellation. Cancellation is
908 * reported as an unchecked exception by join(), and by worker
909 * calls to get(), but is otherwise wrapped in a (checked)
910 * ExecutionException.
911 *
912 * Worker Threads cannot be VirtualThreads, as enforced by
913 * requiring ForkJoinWorkerThreads in factories. There are
914 * several constructions relying on this. However as of this
915 * writing, virtual thread bodies are by default run as some form
916 * of InterruptibleTask.
917 *
955 * We isolate the ForkJoinPool.ctl field that otherwise causes the
956 * most false-sharing misses with respect to other fields. Also,
957 * ForkJoinPool fields are ordered such that fields less prone to
958 * contention effects are first, offsetting those that otherwise
959 * would be, while also reducing total footprint vs using
960 * multiple @Contended regions, which tends to slow down
961 * less-contended applications. To help arrange this, some
962 * non-reference fields are declared as "long" even when ints or
963 * shorts would suffice. For class WorkQueue, an
964 * embedded @Contended region segregates fields most heavily
965 * updated by owners from those most commonly read by stealers or
966 * other management.
967 *
968 * Initial sizing and resizing of WorkQueue arrays is an even more
969 * delicate tradeoff because the best strategy systematically
970 * varies across garbage collectors. Small arrays are better for
971 * locality and reduce GC scan time, but large arrays reduce both
972 * direct false-sharing and indirect cases due to GC bookkeeping
973 * (cardmarks etc), and reduce the number of resizes, which are
974 * not especially fast because they require atomic transfers.
975 * Currently, arrays for workers are initialized to be just large
976 * enough to avoid resizing in most tree-structured tasks, but
977 * larger for external queues where both false-sharing problems
978 * and the need for resizing are more common. (Maintenance note:
979 * any changes in fields, queues, or their uses, or JVM layout
980 * policies, must be accompanied by re-evaluation of these
981 * placement and sizing decisions.)
982 *
983 * Style notes
984 * ===========
985 *
986 * Memory ordering relies mainly on atomic operations (CAS,
987 * getAndSet, getAndAdd) along with moded accesses. These use
988 * jdk-internal Unsafe for atomics and special memory modes,
989 * rather than VarHandles, to avoid initialization dependencies in
990 * other jdk components that require early parallelism. This can
991 * be awkward and ugly, but also reflects the need to control
992 * outcomes across the unusual cases that arise in very racy code
993 * with very few invariants. All atomic task slot updates use
994 * Unsafe operations requiring offset positions, not indices, as
995 * computed by method slotOffset. All fields are read into locals
996 * before use, and null-checked if they are references, even if
997 * they can never be null under current usages. Usually,
998 * computations (held in local variables) are defined as soon as
999 * logically enabled, sometimes to convince compilers that they
1000 * may be performed despite memory ordering constraints. Array
1001 * accesses using masked indices include checks (that are always
1044 */
1045 static final long DEFAULT_KEEPALIVE = 60_000L;
1046
1047 /**
1048 * Undershoot tolerance for idle timeouts, also serving as the
1049 * minimum allowed timeout value.
1050 */
1051 static final long TIMEOUT_SLOP = 20L;
1052
1053 /**
1054 * The default value for common pool maxSpares. Overridable using
1055 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1056 * system property. The default value is far in excess of normal
1057 * requirements, but also far short of maximum capacity and typical OS
1058 * thread limits, so allows JVMs to catch misuse/abuse before
1059 * running out of resources needed to do so.
1060 */
1061 static final int DEFAULT_COMMON_MAX_SPARES = 256;
1062
1063 /**
1064 * Initial capacity of work-stealing queue array for workers.
1065 * Must be a power of two, at least 2. See above.
1066 */
1067 static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1068
1069 /**
1070 * Initial capacity of work-stealing queue array for external queues.
1071 * Must be a power of two, at least 2. See above.
1072 */
1073 static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
1074
1075 // conversions among short, int, long
1076 static final int SMASK = 0xffff; // (unsigned) short bits
1077 static final long LMASK = 0xffffffffL; // lower 32 bits of long
1078 static final long UMASK = ~LMASK; // upper 32 bits
1079
1080 // masks and sentinels for queue indices
1081 static final int MAX_CAP = 0x7fff; // max # workers
1082 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id
1083 static final int INVALID_ID = 0x4000; // unused external queue id
1084
1085 // pool.runState bits
1086 static final long STOP = 1L << 0; // terminating
1087 static final long SHUTDOWN = 1L << 1; // terminate when quiescent
1088 static final long CLEANED = 1L << 2; // stopped and queues cleared
1089 static final long TERMINATED = 1L << 3; // only set if STOP also set
1090 static final long RS_LOCK = 1L << 4; // lowest seqlock bit
1091
1092 // spin/sleep limits for runState locking and elsewhere
1093 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
1094 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
1231 U.putIntOpaque(this, TOP, v);
1232 }
1233 final void updateArray(ForkJoinTask<?>[] a) {
1234 U.getAndSetReference(this, ARRAY, a);
1235 }
1236 final void unlockPhase() {
1237 U.getAndAddInt(this, PHASE, IDLE);
1238 }
1239 final boolean tryLockPhase() { // seqlock acquire
1240 int p;
1241 return (((p = phase) & IDLE) != 0 &&
1242 U.compareAndSetInt(this, PHASE, p, p + IDLE));
1243 }
1244
1245 /**
1246 * Constructor. For internal queues, most fields are initialized
1247 * upon thread start in pool.registerWorker.
1248 */
1249 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1250 boolean clearThreadLocals) {
1251 array = new ForkJoinTask<?>[owner == null ?
1252 INITIAL_EXTERNAL_QUEUE_CAPACITY :
1253 INITIAL_QUEUE_CAPACITY];
1254 this.owner = owner;
1255 this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1256 }
1257
1258 /**
1259 * Returns an exportable index (used by ForkJoinWorkerThread).
1260 */
1261 final int getPoolIndex() {
1262 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1263 }
1264
1265 /**
1266 * Returns the approximate number of tasks in the queue.
1267 */
1268 final int queueSize() {
1269 int unused = phase; // for ordering effect
1270 return Math.max(top - base, 0); // ignore transient negative
1271 }
1272
1273 /**
1274 * Pushes a task. Called only by owner or if already locked
1275 *
1276 * @param task the task; no-op if null
1277 * @param pool the pool to signal if was previously empty, else null
1278 * @param internal if caller owns this queue
1279 * @throws RejectedExecutionException if array could not be resized
1280 */
1281 final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1282 int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1283 if ((a = array) != null && (cap = a.length) > 0 && // else disabled
1284 task != null) {
1285 int pk = task.noUserHelp() + 1; // prev slot offset
1286 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1287 top = s + 1;
1288 long pos = slotOffset(m & s);
1289 if (!internal)
1290 U.putReference(a, pos, task); // inside lock
1291 else
1292 U.getAndSetReference(a, pos, task); // fully fenced
1293 if (room == 0) // resize
1294 growArray(a, cap, s);
1295 }
1296 if (!internal)
1297 unlockPhase();
1298 if (room < 0)
1299 throw new RejectedExecutionException("Queue capacity exceeded");
1300 if ((room == 0 || a[m & (s - pk)] == null) &&
1301 pool != null)
1302 pool.signalWork(); // may have appeared empty
1303 }
1304 }
1305
1306 /**
1307 * Resizes the queue array unless out of memory.
1308 * @param a old array
1309 * @param cap old array capacity
1310 * @param s current top
1311 */
1312 private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
1313 int newCap = cap << 1;
1314 if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1315 ForkJoinTask<?>[] newArray = null;
1316 try {
1317 newArray = new ForkJoinTask<?>[newCap];
1318 } catch (OutOfMemoryError ex) {
1319 }
1320 if (newArray != null) { // else throw on next push
1321 int mask = cap - 1, newMask = newCap - 1;
1322 for (int k = s, j = cap; j > 0; --j, --k) {
1323 ForkJoinTask<?> u; // poll old, push to new
1324 if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1325 a, slotOffset(k & mask), null)) == null)
1326 break; // lost to pollers
1327 newArray[k & newMask] = u;
1328 }
1329 updateArray(newArray); // fully fenced
1330 }
1331 }
1332 }
1333
1334 /**
1335 * Takes next task, if one exists, in order specified by mode,
1336 * so acts as either local-pop or local-poll. Called only by owner.
1337 * @param fifo nonzero if FIFO mode
1338 */
1339 private ForkJoinTask<?> nextLocalTask(int fifo) {
1340 ForkJoinTask<?> t = null;
1341 ForkJoinTask<?>[] a = array;
1342 int b = base, p = top, cap;
1343 if (p - b > 0 && a != null && (cap = a.length) > 0) {
1344 for (int m = cap - 1, s, nb;;) {
1345 if (fifo == 0 || (nb = b + 1) == p) {
1346 if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1347 a, slotOffset(m & (s = p - 1)), null)) != null)
1348 updateTop(s); // else lost race for only task
1349 break;
1350 }
1351 if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1763 return false;
1764 }
1765
1766 /**
1767 * Provides a name for ForkJoinWorkerThread constructor.
1768 */
1769 final String nextWorkerThreadName() {
1770 String prefix = workerNamePrefix;
1771 long tid = incrementThreadIds() + 1L;
1772 if (prefix == null) // commonPool has no prefix
1773 prefix = "ForkJoinPool.commonPool-worker-";
1774 return prefix.concat(Long.toString(tid));
1775 }
1776
1777 /**
1778 * Finishes initializing and records internal queue.
1779 *
1780 * @param w caller's WorkQueue
1781 */
1782 final void registerWorker(WorkQueue w) {
1783 if (w != null && (runState & STOP) == 0L) {
1784 ThreadLocalRandom.localInit();
1785 int seed = w.stackPred = ThreadLocalRandom.getProbe();
1786 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1787 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1788 long stop = lockRunState() & STOP;
1789 try {
1790 WorkQueue[] qs; int n;
1791 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1792 for (int k = n, m = n - 1; ; id += 2) {
1793 if (qs[id &= m] == null)
1794 break;
1795 if ((k -= 2) <= 0) {
1796 id |= n;
1797 break;
1798 }
1799 }
1800 w.phase = id | phaseSeq; // now publishable
1801 if (id < n)
1802 qs[id] = w;
1803 else { // expand
1841 do {} while (c != (c = compareAndExchangeCtl(
1842 c, ((RC_MASK & (c - RC_UNIT)) |
1843 (TC_MASK & (c - TC_UNIT)) |
1844 (LMASK & c)))));
1845 }
1846 if (phase != 0 && w != null) { // remove index unless terminating
1847 long ns = w.nsteals & 0xffffffffL;
1848 if ((runState & STOP) == 0L) {
1849 WorkQueue[] qs; int n, i;
1850 if ((lockRunState() & STOP) == 0L &&
1851 (qs = queues) != null && (n = qs.length) > 0 &&
1852 qs[i = phase & SMASK & (n - 1)] == w) {
1853 qs[i] = null;
1854 stealCount += ns; // accumulate steals
1855 }
1856 unlockRunState();
1857 }
1858 }
1859 if ((tryTerminate(false, false) & STOP) == 0L &&
1860 phase != 0 && w != null && w.source != DROPPED) {
1861 signalWork(); // possibly replace
1862 w.cancelTasks(); // clean queue
1863 }
1864 if (ex != null)
1865 ForkJoinTask.rethrow(ex);
1866 }
1867
1868 /**
1869 * Releases an idle worker, or creates one if not enough exist.
1870 */
1871 final void signalWork() {
1872 int pc = parallelism;
1873 for (long c = ctl;;) {
1874 WorkQueue[] qs = queues;
1875 long ac = (c + RC_UNIT) & RC_MASK, nc;
1876 int sp = (int)c, i = sp & SMASK;
1877 if ((short)(c >>> RC_SHIFT) >= pc)
1878 break;
1879 if (qs == null)
1880 break;
1881 if (qs.length <= i)
1882 break;
1883 WorkQueue w = qs[i], v = null;
1884 if (sp == 0) {
1885 if ((short)(c >>> TC_SHIFT) >= pc)
1886 break;
1887 nc = ((c + TC_UNIT) & TC_MASK);
1888 }
1889 else if ((v = w) == null)
1890 break;
1891 else
1892 nc = (v.stackPred & LMASK) | (c & TC_MASK);
1893 if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
1894 if (v == null)
1895 createWorker();
1896 else {
1897 v.phase = sp;
1898 if (v.parking != 0)
1899 U.unpark(v.owner);
1900 }
1901 break;
1902 }
1903 }
1904 }
1905
1906 /**
1907 * Releases all waiting workers. Called only during shutdown.
1908 */
1909 private void releaseWaiters() {
1910 for (long c = ctl;;) {
1911 WorkQueue[] qs; WorkQueue v; int sp, i;
1912 if ((sp = (int)c) == 0 || (qs = queues) == null ||
1913 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
1957 else if ((e & SHUTDOWN) == 0)
1958 return 0;
1959 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1960 return 0;
1961 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1962 return 1; // enable termination
1963 else
1964 break; // restart
1965 }
1966 }
1967 }
1968
1969 /**
1970 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1971 * See above for explanation.
1972 *
1973 * @param w caller's WorkQueue (may be null on failed initialization)
1974 */
1975 final void runWorker(WorkQueue w) {
1976 if (w != null) {
1977 int phase = w.phase, r = w.stackPred; // seed from registerWorker
1978 int fifo = w.config & FIFO, nsteals = 0, src = -1;
1979 for (;;) {
1980 WorkQueue[] qs;
1981 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1982 if ((runState & STOP) != 0L || (qs = queues) == null)
1983 break;
1984 int n = qs.length, i = r, step = (r >>> 16) | 1;
1985 boolean rescan = false;
1986 scan: for (int l = n; l > 0; --l, i += step) { // scan queues
1987 int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1988 if ((q = qs[j = i & (n - 1)]) != null &&
1989 (a = q.array) != null && (cap = a.length) > 0) {
1990 for (int m = cap - 1, pb = -1, b = q.base;;) {
1991 ForkJoinTask<?> t; long k;
1992 t = (ForkJoinTask<?>)U.getReferenceAcquire(
1993 a, k = slotOffset(m & b));
1994 if (b != (b = q.base) || t == null ||
1995 !U.compareAndSetReference(a, k, t, null)) {
1996 if (a[b & m] == null) {
1997 if (rescan) // end of run
1998 break scan;
1999 if (a[(b + 1) & m] == null &&
2000 a[(b + 2) & m] == null) {
2001 break; // probably empty
2002 }
2003 if (pb == (pb = b)) { // track progress
2004 rescan = true; // stalled; reorder scan
2005 break scan;
2006 }
2007 }
2008 }
2009 else {
2010 boolean propagate;
2011 int nb = q.base = b + 1, prevSrc = src;
2012 w.nsteals = ++nsteals;
2013 w.source = src = j; // volatile
2014 rescan = true;
2015 int nh = t.noUserHelp();
2016 if (propagate =
2017 (prevSrc != src || nh != 0) && a[nb & m] != null)
2018 signalWork();
2019 w.topLevelExec(t, fifo);
2020 if ((b = q.base) != nb && !propagate)
2021 break scan; // reduce interference
2022 }
2023 }
2024 }
2025 }
2026 if (!rescan) {
2027 if (((phase = deactivate(w, phase)) & IDLE) != 0)
2028 break;
2029 src = -1; // re-enable propagation
2030 }
2031 }
2032 }
2033 }
2034
2035 /**
2036 * Deactivates and if necessary awaits signal or termination.
2037 *
2038 * @param w the worker
2039 * @param phase current phase
2040 * @return current phase, with IDLE set if worker should exit
2041 */
2042 private int deactivate(WorkQueue w, int phase) {
2043 if (w == null) // currently impossible
2044 return IDLE;
2045 int p = phase | IDLE, activePhase = phase + (IDLE << 1);
2046 long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
2047 int sp = w.stackPred = (int)pc; // set ctl stack link
2048 w.phase = p;
2049 if (!compareAndSetCtl(pc, qc)) // try to enqueue
2050 return w.phase = phase; // back out on possible signal
2051 int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
2052 if (((e = runState) & STOP) != 0L ||
2053 ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
2054 (qs = queues) == null || (n = qs.length) <= 0)
2055 return IDLE; // terminating
2056
2057 for (int prechecks = Math.min(ac, 2), // reactivation threshold
2058 k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
2059 WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
2060 if (w.phase == activePhase)
2061 return activePhase;
2062 if (--k < 0)
2063 return awaitWork(w, p); // block, drop, or exit
2064 if ((q = qs[k & (n - 1)]) == null)
2065 Thread.onSpinWait();
2066 else if ((a = q.array) != null && (cap = a.length) > 0 &&
2067 a[q.base & (cap - 1)] != null && --prechecks < 0 &&
2068 (int)(c = ctl) == activePhase &&
2069 compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
2070 return w.phase = activePhase; // reactivate
2071 }
2072 }
2073
2074 /**
2075 * Awaits signal or termination.
2076 *
2077 * @param w the work queue
2078 * @param p current phase (known to be idle)
2079 * @return current phase, with IDLE set if worker should exit
2080 */
2081 private int awaitWork(WorkQueue w, int p) {
2082 if (w != null) {
2083 ForkJoinWorkerThread t; long deadline;
2084 if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
2085 t.resetThreadLocals(); // clear before reactivate
2086 if ((ctl & RC_MASK) > 0L)
2087 deadline = 0L;
2088 else if ((deadline =
2089 (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
2090 System.currentTimeMillis()) == 0L)
2091 deadline = 1L; // avoid zero
2092 int activePhase = p + IDLE;
2093 if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
2094 LockSupport.setCurrentBlocker(this);
2095 w.parking = 1; // enable unpark
2096 while ((p = w.phase) != activePhase) {
2097 boolean trimmable = false; int trim;
2098 Thread.interrupted(); // clear status
2099 if ((runState & STOP) != 0L)
2100 break;
2101 if (deadline != 0L) {
2102 if ((trim = tryTrim(w, p, deadline)) > 0)
2103 break;
2104 else if (trim < 0)
2105 deadline = 0L;
2106 else
2107 trimmable = true;
2108 }
2109 U.park(trimmable, deadline);
2110 }
2111 w.parking = 0;
2112 LockSupport.setCurrentBlocker(null);
2113 }
2114 }
2115 return p;
2116 }
2117
2118 /**
2119 * Tries to remove and deregister worker after timeout, and release
2120 * another to do the same.
2121 * @return > 0: trimmed, < 0 : not trimmable, else 0
2122 */
2123 private int tryTrim(WorkQueue w, int phase, long deadline) {
2124 long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2125 if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
2126 stat = -1; // no longer ctl top
2127 else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2128 stat = 0; // spurious wakeup
2129 else if (!compareAndSetCtl(
2130 c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2131 (TC_MASK & (c - TC_UNIT)))))
2132 stat = -1; // lost race to signaller
2133 else {
2134 stat = 1;
2135 w.source = DROPPED;
2544 else
2545 return 0;
2546 }
2547
2548 /**
2549 * Gets and removes a local or stolen task for the given worker.
2550 *
2551 * @return a task, if available
2552 */
2553 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2554 ForkJoinTask<?> t;
2555 if (w == null || (t = w.nextLocalTask()) == null)
2556 t = pollScan(false);
2557 return t;
2558 }
2559
2560 // External operations
2561
2562 /**
2563 * Finds and locks a WorkQueue for an external submitter, or
2564 * throws RejectedExecutionException if shutdown or terminating.
2565 * @param r current ThreadLocalRandom.getProbe() value
2566 * @param rejectOnShutdown true if RejectedExecutionException
2567 * should be thrown when shutdown (else only if terminating)
2568 */
2569 private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
2570 int reuse; // nonzero if prefer create
2571 if ((reuse = r) == 0) {
2572 ThreadLocalRandom.localInit(); // initialize caller's probe
2573 r = ThreadLocalRandom.getProbe();
2574 }
2575 for (int probes = 0; ; ++probes) {
2576 int n, i, id; WorkQueue[] qs; WorkQueue q;
2577 if ((qs = queues) == null)
2578 break;
2579 if ((n = qs.length) <= 0)
2580 break;
2581 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2582 WorkQueue w = new WorkQueue(null, id, 0, false);
2583 w.phase = id;
2584 boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
2585 rejectOnShutdown);
2586 if (!reject && queues == qs && qs[i] == null)
2587 q = qs[i] = w; // else lost race to install
2588 unlockRunState();
2589 if (q != null)
2590 return q;
2591 if (reject)
2592 break;
2593 reuse = 0;
2594 }
2595 if (reuse == 0 || !q.tryLockPhase()) { // move index
2596 if (reuse == 0) {
2597 if (probes >= n >> 1)
2598 reuse = r; // stop prefering free slot
2599 }
2600 else if (q != null)
2601 reuse = 0; // probe on collision
2602 r = ThreadLocalRandom.advanceProbe(r);
2603 }
2604 else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2605 q.unlockPhase(); // check while q lock held
2606 break;
2607 }
2608 else
2609 return q;
2610 }
2611 throw new RejectedExecutionException();
2612 }
2613
2614 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2615 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2616 if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2617 (wt = (ForkJoinWorkerThread)t).pool == this) {
2618 internal = true;
2619 q = wt.workQueue;
2620 }
2621 else { // find and lock queue
2622 internal = false;
2623 q = submissionQueue(ThreadLocalRandom.getProbe(), true);
2624 }
2625 q.push(task, signalIfEmpty ? this : null, internal);
2626 return task;
2627 }
2628
2629 /**
2630 * Returns queue for an external submission, bypassing call to
2631 * submissionQueue if already established and unlocked.
2632 */
2633 final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2634 WorkQueue[] qs; WorkQueue q; int n;
2635 int r = ThreadLocalRandom.getProbe();
2636 return (((qs = queues) != null && (n = qs.length) > 0 &&
2637 (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
2638 q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
2639 }
2640
2641 /**
2642 * Returns queue for an external thread, if one exists that has
2643 * possibly ever submitted to the given pool (nonzero probe), or
2644 * null if none.
2645 */
2646 static WorkQueue externalQueue(ForkJoinPool p) {
2647 WorkQueue[] qs; int n;
2648 int r = ThreadLocalRandom.getProbe();
2649 return (p != null && (qs = p.queues) != null &&
2650 (n = qs.length) > 0 && r != 0) ?
2651 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2652 }
2653
2654 /**
2655 * Returns external queue for common pool.
2656 */
2657 static WorkQueue commonQueue() {
2658 return externalQueue(common);
2659 }
2660
|
543 * WorkQueue field "phase" encodes the queue array id in lower
544 * bits, and otherwise acts similarly to the pool runState field:
545 * The "IDLE" bit is clear while active (either a released worker
546 * or a locked external queue), with other bits serving as a
547 * version counter to distinguish changes across multiple reads.
548 * Note that phase field updates lag queue CAS releases; seeing a
549 * non-idle phase does not guarantee that the worker is available
550 * (and so is never checked in this way).
551 *
552 * The ctl field also serves as the basis for memory
553 * synchronization surrounding activation. This uses a more
554 * efficient version of a Dekker-like rule that task producers and
555 * consumers sync with each other by both writing/CASing ctl (even
556 * if to its current value). However, rather than CASing ctl to
557 * its current value in the common case where no action is
558 * required, we reduce write contention by ensuring that
559 * signalWork invocations are prefaced with a fully fenced memory
560 * access (which is usually needed anyway).
561 *
562 * Signalling. Signals (in signalWork) cause new or reactivated
563 * workers to scan for tasks. SignalWork is invoked in two cases:
564 * (1) When a task is pushed onto an empty queue, and (2) When a
565 * worker takes a top-level task from a queue that has additional
566 * tasks. Together, these suffice in O(log(#threads)) steps to
567 * fully activate with at least enough workers, and ideally no
568 * more than required. This ideal is unobtainable: Callers do not
569 * know whether another worker will finish its current task and
570 * poll for others without need of a signal (which is otherwise an
571 * advantage of work-stealing vs other schemes), and also must
572 * conservatively estimate the triggering conditions of emptiness
573 * or non-emptiness; all of which usually cause more activations
574 * than necessary (see below). (Method signalWork is also used as
575 * failsafe in case of Thread failures in deregisterWorker.)
576 *
577 * Top-Level scheduling
578 * ====================
579 *
580 * Scanning. Method runWorker performs top-level scanning for (and
581 * execution of) tasks by polling a pseudo-random permutation of
582 * the array (by starting at a given index, and using a constant
583 * cyclically exhaustive stride.) It uses the same basic polling
584 * method as WorkQueue.poll(), but restarts with a different
585 * permutation on each rescan. The pseudorandom generator need
586 * not have high-quality statistical properties in the long
587 * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
588 * from ThreadLocalRandom probes, which are cheap and suffice.
589 *
590 * Deactivation. When no tasks are found by a worker in runWorker,
591 * it invokes awaitWork, that first deactivates (to an IDLE
592 * phase). Avoiding missed signals during deactivation requires a
593 * (conservative) rescan, reactivating if there may be tasks to
594 * poll. Because idle workers are often not yet blocked (parked),
595 * we use a WorkQueue field to advertise that a waiter actually
596 * needs unparking upon signal.
597 *
598 * When tasks are constructed as (recursive) dags, top-level
599 * scanning is usually infrequent, and doesn't encounter most
600 * of the following problems addressed by runWorker and awaitWork:
601 *
602 * Locality. Polls are organized into "runs", continuing until
603 * empty or contended, while also minimizing interference by
604 * postponing bookeeping to ends of runs. This may reduce
605 * fairness, which is partially counteracted by the following.
606 *
607 * Contention. When many workers try to poll few queues, they
608 * often collide, generating CAS failures and disrupting locality
609 * of workers already running their tasks. This also leads to
610 * stalls when tasks cannot be taken because other workers have
611 * not finished poll operations, which is detected by reading
612 * ahead in queue arrays. In both caes, workers restart scans in a
613 * way that approximates randomized backoff.
614 *
615 * Oversignalling. When many short top-level tasks are present in
616 * a small number of queues, the above signalling strategy may
617 * activate many more workers than needed, worsening locality and
618 * contention problems, while also generating more global
619 * contention (field is CASed on every activation and
620 * deactivation). We filter out (both in runWorker and
621 * signalWork) attempted signals that are surely not needed
622 * because the signalled tasks are already taken.
623 *
624 * Shutdown and Quiescence
625 * =======================
626 *
627 * Quiescence. Workers scan looking for work, giving up when they
628 * don't find any, without being sure that none are available.
629 * However, some required functionality relies on consensus about
630 * quiescence (also termination, discussed below). The count
631 * fields in ctl allow accurate discovery of states in which all
632 * workers are idle. However, because external (asynchronous)
633 * submitters are not part of this vote, these mechanisms
634 * themselves do not guarantee that the pool is in a quiescent
635 * state with respect to methods isQuiescent, shutdown (which
636 * begins termination when quiescent), helpQuiesce, and indirectly
637 * others including tryCompensate. Method quiescent() is used in
638 * all of these contexts. It provides checks that all workers are
639 * idle and there are no submissions that they could poll if they
640 * were not idle, retrying on inconsistent reads of queues and
641 * using the runState seqLock to retry on queue array updates.
642 * (It also reports quiescence if the pool is terminating.) A true
643 * report means only that there was a moment at which quiescence
644 * held. False negatives are inevitable (for example when queues
645 * indices lag updates, as described above), which is accommodated
855 * ====================
856 *
857 * Regular ForkJoinTasks manage task cancellation (method cancel)
858 * independently from the interrupt status of threads running
859 * tasks. Interrupts are issued internally only while
860 * terminating, to wake up workers and cancel queued tasks. By
861 * default, interrupts are cleared only when necessary to ensure
862 * that calls to LockSupport.park do not loop indefinitely (park
863 * returns immediately if the current thread is interrupted).
864 *
865 * To comply with ExecutorService specs, we use subclasses of
866 * abstract class InterruptibleTask for tasks that require
867 * stronger interruption and cancellation guarantees. External
868 * submitters never run these tasks, even if in the common pool
869 * (as indicated by ForkJoinTask.noUserHelp status bit).
870 * InterruptibleTasks include a "runner" field (implemented
871 * similarly to FutureTask) to support cancel(true). Upon pool
872 * shutdown, runners are interrupted so they can cancel. Since
873 * external joining callers never run these tasks, they must await
874 * cancellation by others, which can occur along several different
875 * paths.
876 *
877 * Across these APIs, rules for reporting exceptions for tasks
878 * with results accessed via join() differ from those via get(),
879 * which differ from those invoked using pool submit methods by
880 * non-workers (which comply with Future.get() specs). Internal
881 * usages of ForkJoinTasks ignore interrupt status when executing
882 * or awaiting completion. Otherwise, reporting task results or
883 * exceptions is preferred to throwing InterruptedExceptions,
884 * which are in turn preferred to timeouts. Similarly, completion
885 * status is preferred to reporting cancellation. Cancellation is
886 * reported as an unchecked exception by join(), and by worker
887 * calls to get(), but is otherwise wrapped in a (checked)
888 * ExecutionException.
889 *
890 * Worker Threads cannot be VirtualThreads, as enforced by
891 * requiring ForkJoinWorkerThreads in factories. There are
892 * several constructions relying on this. However as of this
893 * writing, virtual thread bodies are by default run as some form
894 * of InterruptibleTask.
895 *
933 * We isolate the ForkJoinPool.ctl field that otherwise causes the
934 * most false-sharing misses with respect to other fields. Also,
935 * ForkJoinPool fields are ordered such that fields less prone to
936 * contention effects are first, offsetting those that otherwise
937 * would be, while also reducing total footprint vs using
938 * multiple @Contended regions, which tends to slow down
939 * less-contended applications. To help arrange this, some
940 * non-reference fields are declared as "long" even when ints or
941 * shorts would suffice. For class WorkQueue, an
942 * embedded @Contended region segregates fields most heavily
943 * updated by owners from those most commonly read by stealers or
944 * other management.
945 *
946 * Initial sizing and resizing of WorkQueue arrays is an even more
947 * delicate tradeoff because the best strategy systematically
948 * varies across garbage collectors. Small arrays are better for
949 * locality and reduce GC scan time, but large arrays reduce both
950 * direct false-sharing and indirect cases due to GC bookkeeping
951 * (cardmarks etc), and reduce the number of resizes, which are
952 * not especially fast because they require atomic transfers.
953 * Currently, arrays are initialized to be just large enough to
954 * avoid resizing in most tree-structured tasks, but grow rapidly
955 * until large. (Maintenance note: any changes in fields, queues,
956 * or their uses, or JVM layout policies, must be accompanied by
957 * re-evaluation of these placement and sizing decisions.)
958 *
959 * Style notes
960 * ===========
961 *
962 * Memory ordering relies mainly on atomic operations (CAS,
963 * getAndSet, getAndAdd) along with moded accesses. These use
964 * jdk-internal Unsafe for atomics and special memory modes,
965 * rather than VarHandles, to avoid initialization dependencies in
966 * other jdk components that require early parallelism. This can
967 * be awkward and ugly, but also reflects the need to control
968 * outcomes across the unusual cases that arise in very racy code
969 * with very few invariants. All atomic task slot updates use
970 * Unsafe operations requiring offset positions, not indices, as
971 * computed by method slotOffset. All fields are read into locals
972 * before use, and null-checked if they are references, even if
973 * they can never be null under current usages. Usually,
974 * computations (held in local variables) are defined as soon as
975 * logically enabled, sometimes to convince compilers that they
976 * may be performed despite memory ordering constraints. Array
977 * accesses using masked indices include checks (that are always
1020 */
1021 static final long DEFAULT_KEEPALIVE = 60_000L;
1022
1023 /**
1024 * Undershoot tolerance for idle timeouts, also serving as the
1025 * minimum allowed timeout value.
1026 */
1027 static final long TIMEOUT_SLOP = 20L;
1028
1029 /**
1030 * The default value for common pool maxSpares. Overridable using
1031 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1032 * system property. The default value is far in excess of normal
1033 * requirements, but also far short of maximum capacity and typical OS
1034 * thread limits, so allows JVMs to catch misuse/abuse before
1035 * running out of resources needed to do so.
1036 */
1037 static final int DEFAULT_COMMON_MAX_SPARES = 256;
1038
1039 /**
1040 * Initial capacity of work-stealing queue array.
1041 * Must be a power of two, at least 2. See above.
1042 */
1043 static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1044
1045 // conversions among short, int, long
1046 static final int SMASK = 0xffff; // (unsigned) short bits
1047 static final long LMASK = 0xffffffffL; // lower 32 bits of long
1048 static final long UMASK = ~LMASK; // upper 32 bits
1049
1050 // masks and sentinels for queue indices
1051 static final int MAX_CAP = 0x7fff; // max # workers
1052 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id
1053 static final int INVALID_ID = 0x4000; // unused external queue id
1054
1055 // pool.runState bits
1056 static final long STOP = 1L << 0; // terminating
1057 static final long SHUTDOWN = 1L << 1; // terminate when quiescent
1058 static final long CLEANED = 1L << 2; // stopped and queues cleared
1059 static final long TERMINATED = 1L << 3; // only set if STOP also set
1060 static final long RS_LOCK = 1L << 4; // lowest seqlock bit
1061
1062 // spin/sleep limits for runState locking and elsewhere
1063 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
1064 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
1201 U.putIntOpaque(this, TOP, v);
1202 }
1203 final void updateArray(ForkJoinTask<?>[] a) {
1204 U.getAndSetReference(this, ARRAY, a);
1205 }
1206 final void unlockPhase() {
1207 U.getAndAddInt(this, PHASE, IDLE);
1208 }
1209 final boolean tryLockPhase() { // seqlock acquire
1210 int p;
1211 return (((p = phase) & IDLE) != 0 &&
1212 U.compareAndSetInt(this, PHASE, p, p + IDLE));
1213 }
1214
1215 /**
1216 * Constructor. For internal queues, most fields are initialized
1217 * upon thread start in pool.registerWorker.
1218 */
1219 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1220 boolean clearThreadLocals) {
1221 this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1222 if ((this.owner = owner) == null) {
1223 phase = id | IDLE;
1224 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1225 }
1226 }
1227
1228 /**
1229 * Returns an exportable index (used by ForkJoinWorkerThread).
1230 */
1231 final int getPoolIndex() {
1232 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1233 }
1234
1235 /**
1236 * Returns the approximate number of tasks in the queue.
1237 */
1238 final int queueSize() {
1239 int unused = phase; // for ordering effect
1240 return Math.max(top - base, 0); // ignore transient negative
1241 }
1242
1243 /**
1244 * Pushes a task. Called only by owner or if already locked
1245 *
1246 * @param task the task; no-op if null
1247 * @param pool the pool to signal if was previously empty, else null
1248 * @param internal if caller owns this queue
1249 * @throws RejectedExecutionException if array could not be resized
1250 */
1251 final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1252 int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1253 if ((a = array) != null && (cap = a.length) > 0) { // else disabled
1254 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1255 top = s + 1;
1256 long pos = slotOffset(m & s);
1257 if (!internal)
1258 U.putReference(a, pos, task); // inside lock
1259 else
1260 U.getAndSetReference(a, pos, task); // fully fenced
1261 if (room == 0 && (a = growArray(a, cap, s)) != null)
1262 m = a.length - 1; // resize
1263 }
1264 if (!internal)
1265 unlockPhase();
1266 if (room < 0)
1267 throw new RejectedExecutionException("Queue capacity exceeded");
1268 if (pool != null && a != null &&
1269 U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null)
1270 pool.signalWork(a, m & s); // may have appeared empty
1271 }
1272 }
1273
1274 /**
1275 * Resizes the queue array unless out of memory.
1276 * @param a old array
1277 * @param cap old array capacity
1278 * @param s current top
1279 * @return new array, or null on failure
1280 */
1281 private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
1282 int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
1283 ForkJoinTask<?>[] newArray = null;
1284 if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1285 try {
1286 newArray = new ForkJoinTask<?>[newCap];
1287 } catch (OutOfMemoryError ex) {
1288 }
1289 if (newArray != null) { // else throw on next push
1290 int mask = cap - 1, newMask = newCap - 1;
1291 for (int k = s, j = cap; j > 0; --j, --k) {
1292 ForkJoinTask<?> u; // poll old, push to new
1293 if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1294 a, slotOffset(k & mask), null)) == null)
1295 break; // lost to pollers
1296 newArray[k & newMask] = u;
1297 }
1298 updateArray(newArray); // fully fenced
1299 }
1300 }
1301 return newArray;
1302 }
1303
1304 /**
1305 * Takes next task, if one exists, in order specified by mode,
1306 * so acts as either local-pop or local-poll. Called only by owner.
1307 * @param fifo nonzero if FIFO mode
1308 */
1309 private ForkJoinTask<?> nextLocalTask(int fifo) {
1310 ForkJoinTask<?> t = null;
1311 ForkJoinTask<?>[] a = array;
1312 int b = base, p = top, cap;
1313 if (p - b > 0 && a != null && (cap = a.length) > 0) {
1314 for (int m = cap - 1, s, nb;;) {
1315 if (fifo == 0 || (nb = b + 1) == p) {
1316 if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1317 a, slotOffset(m & (s = p - 1)), null)) != null)
1318 updateTop(s); // else lost race for only task
1319 break;
1320 }
1321 if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1733 return false;
1734 }
1735
1736 /**
1737 * Provides a name for ForkJoinWorkerThread constructor.
1738 */
1739 final String nextWorkerThreadName() {
1740 String prefix = workerNamePrefix;
1741 long tid = incrementThreadIds() + 1L;
1742 if (prefix == null) // commonPool has no prefix
1743 prefix = "ForkJoinPool.commonPool-worker-";
1744 return prefix.concat(Long.toString(tid));
1745 }
1746
1747 /**
1748 * Finishes initializing and records internal queue.
1749 *
1750 * @param w caller's WorkQueue
1751 */
1752 final void registerWorker(WorkQueue w) {
1753 if (w != null) {
1754 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1755 ThreadLocalRandom.localInit();
1756 int seed = w.stackPred = ThreadLocalRandom.getProbe();
1757 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1758 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1759 long stop = lockRunState() & STOP;
1760 try {
1761 WorkQueue[] qs; int n;
1762 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1763 for (int k = n, m = n - 1; ; id += 2) {
1764 if (qs[id &= m] == null)
1765 break;
1766 if ((k -= 2) <= 0) {
1767 id |= n;
1768 break;
1769 }
1770 }
1771 w.phase = id | phaseSeq; // now publishable
1772 if (id < n)
1773 qs[id] = w;
1774 else { // expand
1812 do {} while (c != (c = compareAndExchangeCtl(
1813 c, ((RC_MASK & (c - RC_UNIT)) |
1814 (TC_MASK & (c - TC_UNIT)) |
1815 (LMASK & c)))));
1816 }
1817 if (phase != 0 && w != null) { // remove index unless terminating
1818 long ns = w.nsteals & 0xffffffffL;
1819 if ((runState & STOP) == 0L) {
1820 WorkQueue[] qs; int n, i;
1821 if ((lockRunState() & STOP) == 0L &&
1822 (qs = queues) != null && (n = qs.length) > 0 &&
1823 qs[i = phase & SMASK & (n - 1)] == w) {
1824 qs[i] = null;
1825 stealCount += ns; // accumulate steals
1826 }
1827 unlockRunState();
1828 }
1829 }
1830 if ((tryTerminate(false, false) & STOP) == 0L &&
1831 phase != 0 && w != null && w.source != DROPPED) {
1832 w.cancelTasks(); // clean queue
1833 signalWork(null, 0); // possibly replace
1834 }
1835 if (ex != null)
1836 ForkJoinTask.rethrow(ex);
1837 }
1838
1839 /**
1840 * Releases an idle worker, or creates one if not enough exist,
1841 * giving up if array a is nonnull and task at a[k] already taken.
1842 */
1843 final void signalWork(ForkJoinTask<?>[] a, int k) {
1844 int pc = parallelism;
1845 for (long c = ctl;;) {
1846 WorkQueue[] qs = queues;
1847 long ac = (c + RC_UNIT) & RC_MASK, nc;
1848 int sp = (int)c, i = sp & SMASK;
1849 if ((short)(c >>> RC_SHIFT) >= pc)
1850 break;
1851 if (qs == null)
1852 break;
1853 if (qs.length <= i)
1854 break;
1855 WorkQueue w = qs[i], v = null;
1856 if (sp == 0) {
1857 if ((short)(c >>> TC_SHIFT) >= pc)
1858 break;
1859 nc = ((c + TC_UNIT) & TC_MASK) | ac;
1860 }
1861 else if ((v = w) == null)
1862 break;
1863 else
1864 nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
1865 if (a != null && k < a.length && k >= 0 && a[k] == null)
1866 break;
1867 if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
1868 if (v == null)
1869 createWorker();
1870 else {
1871 v.phase = sp;
1872 if (v.parking != 0)
1873 U.unpark(v.owner);
1874 }
1875 break;
1876 }
1877 }
1878 }
1879
1880 /**
1881 * Releases all waiting workers. Called only during shutdown.
1882 */
1883 private void releaseWaiters() {
1884 for (long c = ctl;;) {
1885 WorkQueue[] qs; WorkQueue v; int sp, i;
1886 if ((sp = (int)c) == 0 || (qs = queues) == null ||
1887 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
1931 else if ((e & SHUTDOWN) == 0)
1932 return 0;
1933 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1934 return 0;
1935 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1936 return 1; // enable termination
1937 else
1938 break; // restart
1939 }
1940 }
1941 }
1942
1943 /**
1944 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1945 * See above for explanation.
1946 *
1947 * @param w caller's WorkQueue (may be null on failed initialization)
1948 */
1949 final void runWorker(WorkQueue w) {
1950 if (w != null) {
1951 int phase = w.phase;
1952 int r = w.stackPred, origin = r; // seed from registerWorker
1953 int cfg = w.config, fifo = cfg & FIFO, clearLocals = cfg & CLEAR_TLS;
1954 int src = -1; // current source queue
1955 int taken = 0, ptaken = 0, staken = 0; // takes per phase and scan
1956 rescan: while ((runState & STOP) == 0L) {
1957 WorkQueue[] qs = queues;
1958 int n = (qs == null) ? 0 : qs.length;
1959 int i = origin, step = (r >>> 16) | 1;
1960 r ^= r << 13; r ^= r >>> 17; origin = r ^= r << 5; // xorshift
1961 for (int l = n; l > 0; --l, i += step) { // scan queues
1962 WorkQueue q; int j;
1963 if ((q = qs[j = i & (n - 1)]) != null) {
1964 for (;;) { // poll q
1965 ForkJoinTask<?>[] a; int cap, b, m, k;
1966 if ((a = q.array) == null || (cap = a.length) <= 0)
1967 break;
1968 long bp = slotOffset(k = (b = q.base) & (m = cap - 1));
1969 int nb = b + 1, nk = nb & m;
1970 ForkJoinTask<?> t = (ForkJoinTask<?>)
1971 U.getReferenceAcquire(a, bp);
1972 if (q.array != a || q.base != b || a[k] != t)
1973 continue; // inconsistent
1974 if (t == null) {
1975 if (taken != staken) {
1976 staken = taken;
1977 continue rescan; // sweep until clean
1978 }
1979 if (a[nk] != null || a[(b + 2) & m] != null)
1980 continue rescan; // stalled; reorder scan
1981 break; // probably empty
1982 }
1983 if (U.compareAndSetReference(a, bp, t, null)) {
1984 q.base = nb;
1985 Object nt = U.getReferenceAcquire
1986 (a, slotOffset(nk)); // confirm below
1987 ++taken;
1988 if (src != j)
1989 w.source = src = j;
1990 if (nt != null && nt == a[nk])
1991 signalWork(a, nk); // propagate
1992 w.topLevelExec(t, fifo); // run t & its subtasks
1993 }
1994 }
1995 }
1996 }
1997 if (taken != ptaken) { // end run
1998 ptaken = taken;
1999 origin = src; // hint for next run
2000 if (clearLocals != 0 &&
2001 Thread.currentThread() instanceof ForkJoinWorkerThread wt)
2002 wt.resetThreadLocals();
2003 w.nsteals = taken;
2004 }
2005 w.phase = phase += IDLE; // deactivate
2006 if ((phase = awaitWork(w, phase)) == IDLE)
2007 break;
2008 }
2009 }
2010 }
2011
2012 /**
2013 * Awaits signal or termination.
2014 *
2015 * @param w the work queue
2016 * @param p current phase (known to be idle
2017 * @return current phase or IDLE if worker should exit
2018 */
2019 private int awaitWork(WorkQueue w, int p) {
2020 if (w == null) // never true; hoist checks
2021 return IDLE;
2022 int activePhase = p + IDLE;
2023 long ap = activePhase & LMASK, pc = ctl, qc;
2024 do { // enqueue
2025 qc = ap | ((pc - RC_UNIT) & UMASK);
2026 w.stackPred = (int)pc; // set ctl stack link
2027 } while (pc != (pc = compareAndExchangeCtl(pc, qc)));
2028 long psp = pc & LMASK; // reactivation stack prefix
2029 WorkQueue[] qs; int n; // missed signal check
2030 if ((runState & STOP) != 0 || (qs = queues) == null || (n = qs.length) <= 0)
2031 return IDLE; // already terminating
2032 for (int m = n - 1, origin = p + 1, i = 0; i < m; ++i) {
2033 WorkQueue q; long cc; // stagger origins
2034 if ((q = qs[(origin + i) & m]) != null && q.top - q.base > 0) {
2035 if ((p = w.phase) == activePhase)
2036 break;
2037 if ((int)(cc = ctl) == activePhase &&
2038 compareAndSetCtl(cc, psp | ((cc + RC_UNIT) & UMASK))) {
2039 p = w.phase = activePhase;
2040 break; // reactivated
2041 }
2042 }
2043 }
2044 if (p != activePhase && (p = w.phase) != activePhase) {
2045 long deadline = 0L, c, e; // quiescence checks
2046 if (((e = runState) & STOP) != 0)
2047 return IDLE;
2048 else if ((int)(c = ctl) != activePhase || (c & RC_MASK) > 0L) {
2049 for (int spins = n; (p = w.phase) != activePhase && --spins > 0;)
2050 Thread.onSpinWait(); // spin unless possibly quiescent
2051 }
2052 else if ((e & SHUTDOWN) != 0L && quiescent() > 0)
2053 return IDLE; // quiescent termination
2054 else { // use trim timeout
2055 long d = ((w.source != INVALID_ID) ? keepAlive :
2056 TIMEOUT_SLOP) + System.currentTimeMillis();
2057 deadline = (d == 0L)? 1L : d; // avoid zero
2058 p = w.phase;
2059 }
2060 if (p != activePhase) { // block
2061 LockSupport.setCurrentBlocker(this);
2062 w.parking = 1; // enable unpark
2063 while ((p = w.phase) != activePhase) {
2064 boolean trimmable = false; int trim;
2065 Thread.interrupted(); // clear status
2066 if ((runState & STOP) != 0L)
2067 break;
2068 if (deadline != 0L) {
2069 if ((trim = tryTrim(w, p, deadline)) > 0)
2070 break;
2071 else if (trim < 0)
2072 deadline = 0L;
2073 else
2074 trimmable = true;
2075 }
2076 U.park(trimmable, deadline);
2077 }
2078 w.parking = 0;
2079 LockSupport.setCurrentBlocker(null);
2080 if (p != activePhase)
2081 return IDLE;
2082 }
2083 }
2084 return activePhase;
2085 }
2086
2087 /**
2088 * Tries to remove and deregister worker after timeout, and release
2089 * another to do the same.
2090 * @return > 0: trimmed, < 0 : not trimmable, else 0
2091 */
2092 private int tryTrim(WorkQueue w, int phase, long deadline) {
2093 long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2094 if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
2095 stat = -1; // no longer ctl top
2096 else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2097 stat = 0; // spurious wakeup
2098 else if (!compareAndSetCtl(
2099 c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2100 (TC_MASK & (c - TC_UNIT)))))
2101 stat = -1; // lost race to signaller
2102 else {
2103 stat = 1;
2104 w.source = DROPPED;
2513 else
2514 return 0;
2515 }
2516
2517 /**
2518 * Gets and removes a local or stolen task for the given worker.
2519 *
2520 * @return a task, if available
2521 */
2522 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2523 ForkJoinTask<?> t;
2524 if (w == null || (t = w.nextLocalTask()) == null)
2525 t = pollScan(false);
2526 return t;
2527 }
2528
2529 // External operations
2530
2531 /**
2532 * Finds and locks a WorkQueue for an external submitter, or
2533 * throws RejectedExecutionException if shutdown
2534 * @param rejectOnShutdown true if RejectedExecutionException
2535 * should be thrown when shutdown
2536 */
2537 final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2538 int r;
2539 if ((r = ThreadLocalRandom.getProbe()) == 0) {
2540 ThreadLocalRandom.localInit(); // initialize caller's probe
2541 r = ThreadLocalRandom.getProbe();
2542 }
2543 for (;;) {
2544 WorkQueue q; WorkQueue[] qs; int n, id, i;
2545 if ((qs = queues) == null || (n = qs.length) <= 0)
2546 break;
2547 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2548 WorkQueue newq = new WorkQueue(null, id, 0, false);
2549 lockRunState();
2550 if (qs[i] == null && queues == qs)
2551 q = qs[i] = newq; // else lost race to install
2552 unlockRunState();
2553 }
2554 if (q != null && q.tryLockPhase()) {
2555 if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2556 q.unlockPhase(); // check while q lock held
2557 break;
2558 }
2559 return q;
2560 }
2561 r = ThreadLocalRandom.advanceProbe(r); // move
2562 }
2563 throw new RejectedExecutionException();
2564 }
2565
2566 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2567 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2568 if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2569 (wt = (ForkJoinWorkerThread)t).pool == this) {
2570 internal = true;
2571 q = wt.workQueue;
2572 }
2573 else { // find and lock queue
2574 internal = false;
2575 q = externalSubmissionQueue(true);
2576 }
2577 q.push(task, signalIfEmpty ? this : null, internal);
2578 return task;
2579 }
2580
2581 /**
2582 * Returns queue for an external thread, if one exists that has
2583 * possibly ever submitted to the given pool (nonzero probe), or
2584 * null if none.
2585 */
2586 static WorkQueue externalQueue(ForkJoinPool p) {
2587 WorkQueue[] qs; int n;
2588 int r = ThreadLocalRandom.getProbe();
2589 return (p != null && (qs = p.queues) != null &&
2590 (n = qs.length) > 0 && r != 0) ?
2591 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2592 }
2593
2594 /**
2595 * Returns external queue for common pool.
2596 */
2597 static WorkQueue commonQueue() {
2598 return externalQueue(common);
2599 }
2600
|