543 * WorkQueue field "phase" encodes the queue array id in lower
544 * bits, and otherwise acts similarly to the pool runState field:
545 * The "IDLE" bit is clear while active (either a released worker
546 * or a locked external queue), with other bits serving as a
547 * version counter to distinguish changes across multiple reads.
548 * Note that phase field updates lag queue CAS releases; seeing a
549 * non-idle phase does not guarantee that the worker is available
550 * (and so is never checked in this way).
551 *
552 * The ctl field also serves as the basis for memory
553 * synchronization surrounding activation. This uses a more
554 * efficient version of a Dekker-like rule that task producers and
555 * consumers sync with each other by both writing/CASing ctl (even
556 * if to its current value). However, rather than CASing ctl to
557 * its current value in the common case where no action is
558 * required, we reduce write contention by ensuring that
559 * signalWork invocations are prefaced with a fully fenced memory
560 * access (which is usually needed anyway).
561 *
562 * Signalling. Signals (in signalWork) cause new or reactivated
563 * workers to scan for tasks. Method signalWork and its callers
564 * try to approximate the unattainable goal of having the right
565 * number of workers activated for the tasks at hand, but must err
566 * on the side of too many workers vs too few to avoid stalls:
567 *
568 * * If computations are purely tree structured, it suffices for
569 * every worker to activate another when it pushes a task into
570 * an empty queue, resulting in O(log(#threads)) steps to full
571 * activation. Emptiness must be conservatively approximated,
572 * which may result in unnecessary signals. Also, to reduce
573 * resource usages in some cases, at the expense of slower
574 * startup in others, activation of an idle thread is preferred
575 * over creating a new one, here and elsewhere.
576 *
577 * * At the other extreme, if "flat" tasks (those that do not in
578 * turn generate others) come in serially from only a single
579 * producer, each worker taking a task from a queue should
580 * propagate a signal if there are more tasks in that
581 * queue. This is equivalent to, but generally faster than,
582 * arranging the stealer take multiple tasks, re-pushing one or
583 * more on its own queue, and signalling (because its queue is
584 * empty), also resulting in logarithmic full activation
585 * time. If tasks do not not engage in unbounded loops based on
586 * the actions of other workers with unknown dependencies loop,
587 * this form of proagation can be limited to one signal per
588 * activation (phase change). We distinguish the cases by
589 * further signalling only if the task is an InterruptibleTask
590 * (see below), which are the only supported forms of task that
591 * may do so.
592 *
593 * * Because we don't know about usage patterns (or most commonly,
594 * mixtures), we use both approaches, which present even more
595 * opportunities to over-signal. (Failure to distinguish these
596 * cases in terms of submission methods was arguably an early
597 * design mistake.) Note that in either of these contexts,
598 * signals may be (and often are) unnecessary because active
599 * workers continue scanning after running tasks without the
600 * need to be signalled (which is one reason work stealing is
601 * often faster than alternatives), so additional workers
602 * aren't needed.
603 *
604 * * For rapidly branching tasks that require full pool resources,
605 * oversignalling is OK, because signalWork will soon have no
606 * more workers to create or reactivate. But for others (mainly
607 * externally submitted tasks), overprovisioning may cause very
608 * noticeable slowdowns due to contention and resource
609 * wastage. We reduce impact by deactivating workers when
610 * queues don't have accessible tasks, but reactivating and
611 * rescanning if other tasks remain.
612 *
613 * * Despite these, signal contention and overhead effects still
614 * occur during ramp-up and ramp-down of small computations.
615 *
616 * Scanning. Method runWorker performs top-level scanning for (and
617 * execution of) tasks by polling a pseudo-random permutation of
618 * the array (by starting at a given index, and using a constant
619 * cyclically exhaustive stride.) It uses the same basic polling
620 * method as WorkQueue.poll(), but restarts with a different
621 * permutation on each invocation. The pseudorandom generator
622 * need not have high-quality statistical properties in the long
623 * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
624 * from ThreadLocalRandom probes, which are cheap and
625 * suffice. Each queue's polling attempts to avoid becoming stuck
626 * when other scanners/pollers stall. Scans do not otherwise
627 * explicitly take into account core affinities, loads, cache
628 * localities, etc, However, they do exploit temporal locality
629 * (which usually approximates these) by preferring to re-poll
630 * from the same queue after a successful poll before trying
631 * others, which also reduces bookkeeping, cache traffic, and
632 * scanning overhead. But it also reduces fairness, which is
633 * partially counteracted by giving up on detected interference
634 * (which also reduces contention when too many workers try to
635 * take small tasks from the same queue).
636 *
637 * Deactivation. When no tasks are found by a worker in runWorker,
638 * it tries to deactivate()), giving up (and rescanning) on "ctl"
639 * contention. To avoid missed signals during deactivation, the
640 * method rescans and reactivates if there may have been a missed
641 * signal during deactivation. To reduce false-alarm reactivations
642 * while doing so, we scan multiple times (analogously to method
643 * quiescent()) before trying to reactivate. Because idle workers
644 * are often not yet blocked (parked), we use a WorkQueue field to
645 * advertise that a waiter actually needs unparking upon signal.
646 *
647 * Quiescence. Workers scan looking for work, giving up when they
648 * don't find any, without being sure that none are available.
649 * However, some required functionality relies on consensus about
650 * quiescence (also termination, discussed below). The count
651 * fields in ctl allow accurate discovery of states in which all
652 * workers are idle. However, because external (asynchronous)
653 * submitters are not part of this vote, these mechanisms
654 * themselves do not guarantee that the pool is in a quiescent
655 * state with respect to methods isQuiescent, shutdown (which
656 * begins termination when quiescent), helpQuiesce, and indirectly
657 * others including tryCompensate. Method quiescent() is used in
658 * all of these contexts. It provides checks that all workers are
659 * idle and there are no submissions that they could poll if they
660 * were not idle, retrying on inconsistent reads of queues and
661 * using the runState seqLock to retry on queue array updates.
662 * (It also reports quiescence if the pool is terminating.) A true
663 * report means only that there was a moment at which quiescence
664 * held. False negatives are inevitable (for example when queues
665 * indices lag updates, as described above), which is accommodated
875 * ====================
876 *
877 * Regular ForkJoinTasks manage task cancellation (method cancel)
878 * independently from the interrupt status of threads running
879 * tasks. Interrupts are issued internally only while
880 * terminating, to wake up workers and cancel queued tasks. By
881 * default, interrupts are cleared only when necessary to ensure
882 * that calls to LockSupport.park do not loop indefinitely (park
883 * returns immediately if the current thread is interrupted).
884 *
885 * To comply with ExecutorService specs, we use subclasses of
886 * abstract class InterruptibleTask for tasks that require
887 * stronger interruption and cancellation guarantees. External
888 * submitters never run these tasks, even if in the common pool
889 * (as indicated by ForkJoinTask.noUserHelp status bit).
890 * InterruptibleTasks include a "runner" field (implemented
891 * similarly to FutureTask) to support cancel(true). Upon pool
892 * shutdown, runners are interrupted so they can cancel. Since
893 * external joining callers never run these tasks, they must await
894 * cancellation by others, which can occur along several different
895 * paths. The inability to rely on caller-runs may also require
896 * extra signalling (resulting in scanning and contention) so is
897 * done only conditionally in methods push and runworker.
898 *
899 * Across these APIs, rules for reporting exceptions for tasks
900 * with results accessed via join() differ from those via get(),
901 * which differ from those invoked using pool submit methods by
902 * non-workers (which comply with Future.get() specs). Internal
903 * usages of ForkJoinTasks ignore interrupt status when executing
904 * or awaiting completion. Otherwise, reporting task results or
905 * exceptions is preferred to throwing InterruptedExceptions,
906 * which are in turn preferred to timeouts. Similarly, completion
907 * status is preferred to reporting cancellation. Cancellation is
908 * reported as an unchecked exception by join(), and by worker
909 * calls to get(), but is otherwise wrapped in a (checked)
910 * ExecutionException.
911 *
912 * Worker Threads cannot be VirtualThreads, as enforced by
913 * requiring ForkJoinWorkerThreads in factories. There are
914 * several constructions relying on this. However as of this
915 * writing, virtual thread bodies are by default run as some form
916 * of InterruptibleTask.
917 *
944 * the placement of their fields. Caches misses and contention due
945 * to false-sharing have been observed to slow down some programs
946 * by more than a factor of four. Effects may vary across initial
947 * memory configuarations, applications, and different garbage
948 * collectors and GC settings, so there is no perfect solution.
949 * Too much isolation may generate more cache misses in common
950 * cases (because some fields snd slots are usually read at the
951 * same time). The @Contended annotation provides only rough
952 * control (for good reason). Similarly for relying on fields
953 * being placed in size-sorted declaration order.
954 *
955 * We isolate the ForkJoinPool.ctl field that otherwise causes the
956 * most false-sharing misses with respect to other fields. Also,
957 * ForkJoinPool fields are ordered such that fields less prone to
958 * contention effects are first, offsetting those that otherwise
959 * would be, while also reducing total footprint vs using
960 * multiple @Contended regions, which tends to slow down
961 * less-contended applications. To help arrange this, some
962 * non-reference fields are declared as "long" even when ints or
963 * shorts would suffice. For class WorkQueue, an
964 * embedded @Contended region segregates fields most heavily
965 * updated by owners from those most commonly read by stealers or
966 * other management.
967 *
968 * Initial sizing and resizing of WorkQueue arrays is an even more
969 * delicate tradeoff because the best strategy systematically
970 * varies across garbage collectors. Small arrays are better for
971 * locality and reduce GC scan time, but large arrays reduce both
972 * direct false-sharing and indirect cases due to GC bookkeeping
973 * (cardmarks etc), and reduce the number of resizes, which are
974 * not especially fast because they require atomic transfers.
975 * Currently, arrays for workers are initialized to be just large
976 * enough to avoid resizing in most tree-structured tasks, but
977 * larger for external queues where both false-sharing problems
978 * and the need for resizing are more common. (Maintenance note:
979 * any changes in fields, queues, or their uses, or JVM layout
980 * policies, must be accompanied by re-evaluation of these
981 * placement and sizing decisions.)
982 *
983 * Style notes
984 * ===========
985 *
986 * Memory ordering relies mainly on atomic operations (CAS,
987 * getAndSet, getAndAdd) along with moded accesses. These use
988 * jdk-internal Unsafe for atomics and special memory modes,
989 * rather than VarHandles, to avoid initialization dependencies in
990 * other jdk components that require early parallelism. This can
991 * be awkward and ugly, but also reflects the need to control
992 * outcomes across the unusual cases that arise in very racy code
993 * with very few invariants. All atomic task slot updates use
994 * Unsafe operations requiring offset positions, not indices, as
995 * computed by method slotOffset. All fields are read into locals
996 * before use, and null-checked if they are references, even if
997 * they can never be null under current usages. Usually,
998 * computations (held in local variables) are defined as soon as
999 * logically enabled, sometimes to convince compilers that they
1000 * may be performed despite memory ordering constraints. Array
1001 * accesses using masked indices include checks (that are always
1044 */
1045 static final long DEFAULT_KEEPALIVE = 60_000L;
1046
1047 /**
1048 * Undershoot tolerance for idle timeouts, also serving as the
1049 * minimum allowed timeout value.
1050 */
1051 static final long TIMEOUT_SLOP = 20L;
1052
1053 /**
1054 * The default value for common pool maxSpares. Overridable using
1055 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1056 * system property. The default value is far in excess of normal
1057 * requirements, but also far short of maximum capacity and typical OS
1058 * thread limits, so allows JVMs to catch misuse/abuse before
1059 * running out of resources needed to do so.
1060 */
1061 static final int DEFAULT_COMMON_MAX_SPARES = 256;
1062
1063 /**
1064 * Initial capacity of work-stealing queue array for workers.
1065 * Must be a power of two, at least 2. See above.
1066 */
1067 static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1068
1069 /**
1070 * Initial capacity of work-stealing queue array for external queues.
1071 * Must be a power of two, at least 2. See above.
1072 */
1073 static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
1074
1075 // conversions among short, int, long
1076 static final int SMASK = 0xffff; // (unsigned) short bits
1077 static final long LMASK = 0xffffffffL; // lower 32 bits of long
1078 static final long UMASK = ~LMASK; // upper 32 bits
1079
1080 // masks and sentinels for queue indices
1081 static final int MAX_CAP = 0x7fff; // max # workers
1082 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id
1083 static final int INVALID_ID = 0x4000; // unused external queue id
1084
1085 // pool.runState bits
1086 static final long STOP = 1L << 0; // terminating
1087 static final long SHUTDOWN = 1L << 1; // terminate when quiescent
1088 static final long CLEANED = 1L << 2; // stopped and queues cleared
1089 static final long TERMINATED = 1L << 3; // only set if STOP also set
1090 static final long RS_LOCK = 1L << 4; // lowest seqlock bit
1091
1092 // spin/sleep limits for runState locking and elsewhere
1093 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
1094 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
1186 static final class DefaultForkJoinWorkerThreadFactory
1187 implements ForkJoinWorkerThreadFactory {
1188 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1189 return ((pool.workerNamePrefix == null) ? // is commonPool
1190 new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1191 new ForkJoinWorkerThread(null, pool, true, false));
1192 }
1193 }
1194
1195 /**
1196 * Queues supporting work-stealing as well as external task
1197 * submission. See above for descriptions and algorithms.
1198 */
1199 static final class WorkQueue {
1200 // fields declared in order of their likely layout on most VMs
1201 final ForkJoinWorkerThread owner; // null if shared
1202 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
1203 int base; // index of next slot for poll
1204 final int config; // mode bits
1205
1206 // fields otherwise causing more unnecessary false-sharing cache misses
1207 @jdk.internal.vm.annotation.Contended("w")
1208 int top; // index of next slot for push
1209 @jdk.internal.vm.annotation.Contended("w")
1210 volatile int phase; // versioned active status
1211 @jdk.internal.vm.annotation.Contended("w")
1212 int stackPred; // pool stack (ctl) predecessor link
1213 @jdk.internal.vm.annotation.Contended("w")
1214 volatile int source; // source queue id (or DROPPED)
1215 @jdk.internal.vm.annotation.Contended("w")
1216 int nsteals; // number of steals from other queues
1217 @jdk.internal.vm.annotation.Contended("w")
1218 volatile int parking; // nonzero if parked in awaitWork
1219
1220 // Support for atomic operations
1221 private static final Unsafe U;
1222 private static final long PHASE;
1223 private static final long BASE;
1224 private static final long TOP;
1225 private static final long ARRAY;
1226
1227 final void updateBase(int v) {
1228 U.putIntVolatile(this, BASE, v);
1229 }
1230 final void updateTop(int v) {
1231 U.putIntOpaque(this, TOP, v);
1232 }
1233 final void updateArray(ForkJoinTask<?>[] a) {
1234 U.getAndSetReference(this, ARRAY, a);
1235 }
1236 final void unlockPhase() {
1237 U.getAndAddInt(this, PHASE, IDLE);
1238 }
1239 final boolean tryLockPhase() { // seqlock acquire
1240 int p;
1241 return (((p = phase) & IDLE) != 0 &&
1242 U.compareAndSetInt(this, PHASE, p, p + IDLE));
1243 }
1244
1245 /**
1246 * Constructor. For internal queues, most fields are initialized
1247 * upon thread start in pool.registerWorker.
1248 */
1249 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1250 boolean clearThreadLocals) {
1251 array = new ForkJoinTask<?>[owner == null ?
1252 INITIAL_EXTERNAL_QUEUE_CAPACITY :
1253 INITIAL_QUEUE_CAPACITY];
1254 this.owner = owner;
1255 this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1256 }
1257
1258 /**
1259 * Returns an exportable index (used by ForkJoinWorkerThread).
1260 */
1261 final int getPoolIndex() {
1262 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1263 }
1264
1265 /**
1266 * Returns the approximate number of tasks in the queue.
1267 */
1268 final int queueSize() {
1269 int unused = phase; // for ordering effect
1270 return Math.max(top - base, 0); // ignore transient negative
1271 }
1272
1273 /**
1274 * Pushes a task. Called only by owner or if already locked
1275 *
1276 * @param task the task; no-op if null
1277 * @param pool the pool to signal if was previously empty, else null
1278 * @param internal if caller owns this queue
1279 * @throws RejectedExecutionException if array could not be resized
1280 */
1281 final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1282 int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1283 if ((a = array) != null && (cap = a.length) > 0 && // else disabled
1284 task != null) {
1285 int pk = task.noUserHelp() + 1; // prev slot offset
1286 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1287 top = s + 1;
1288 long pos = slotOffset(m & s);
1289 if (!internal)
1290 U.putReference(a, pos, task); // inside lock
1291 else
1292 U.getAndSetReference(a, pos, task); // fully fenced
1293 if (room == 0) // resize
1294 growArray(a, cap, s);
1295 }
1296 if (!internal)
1297 unlockPhase();
1298 if (room < 0)
1299 throw new RejectedExecutionException("Queue capacity exceeded");
1300 if ((room == 0 || a[m & (s - pk)] == null) &&
1301 pool != null)
1302 pool.signalWork(); // may have appeared empty
1303 }
1304 }
1305
1306 /**
1307 * Resizes the queue array unless out of memory.
1308 * @param a old array
1309 * @param cap old array capacity
1310 * @param s current top
1311 */
1312 private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
1313 int newCap = cap << 1;
1314 if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1315 ForkJoinTask<?>[] newArray = null;
1316 try {
1317 newArray = new ForkJoinTask<?>[newCap];
1318 } catch (OutOfMemoryError ex) {
1319 }
1320 if (newArray != null) { // else throw on next push
1321 int mask = cap - 1, newMask = newCap - 1;
1322 for (int k = s, j = cap; j > 0; --j, --k) {
1323 ForkJoinTask<?> u; // poll old, push to new
1324 if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1325 a, slotOffset(k & mask), null)) == null)
1326 break; // lost to pollers
1327 newArray[k & newMask] = u;
1328 }
1329 updateArray(newArray); // fully fenced
1330 }
1331 }
1332 }
1333
1334 /**
1335 * Takes next task, if one exists, in order specified by mode,
1336 * so acts as either local-pop or local-poll. Called only by owner.
1337 * @param fifo nonzero if FIFO mode
1338 */
1339 private ForkJoinTask<?> nextLocalTask(int fifo) {
1340 ForkJoinTask<?> t = null;
1341 ForkJoinTask<?>[] a = array;
1342 int b = base, p = top, cap;
1343 if (p - b > 0 && a != null && (cap = a.length) > 0) {
1344 for (int m = cap - 1, s, nb;;) {
1345 if (fifo == 0 || (nb = b + 1) == p) {
1346 if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1347 a, slotOffset(m & (s = p - 1)), null)) != null)
1348 updateTop(s); // else lost race for only task
1349 break;
1350 }
1351 if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1352 a, slotOffset(m & b), null)) != null) {
1353 updateBase(nb);
1354 break;
1355 }
1356 while (b == (b = U.getIntAcquire(this, BASE)))
1357 Thread.onSpinWait(); // spin to reduce memory traffic
1358 if (p - b <= 0)
1359 break;
1360 }
1361 }
1362 return t;
1363 }
1364
1365 /**
1366 * Takes next task, if one exists, using configured mode.
1367 * (Always internal, never called for Common pool.)
1368 */
1369 final ForkJoinTask<?> nextLocalTask() {
1370 return nextLocalTask(config & FIFO);
1371 }
1372
1373 /**
1374 * Pops the given task only if it is at the current top.
1375 * @param task the task. Caller must ensure non-null.
1376 * @param internal if caller owns this queue
1377 */
1378 final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1379 boolean taken = false;
1380 ForkJoinTask<?>[] a = array;
1381 int p = top, s = p - 1, cap; long k;
1382 if (a != null && (cap = a.length) > 0 &&
1383 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1384 (internal || tryLockPhase())) {
1385 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1386 taken = true;
1387 updateTop(s);
1388 }
1389 if (!internal)
1390 unlockPhase();
1431 break; // empty
1432 if (pb == b)
1433 Thread.onSpinWait(); // stalled
1434 }
1435 else if (U.compareAndSetReference(a, k, t, null)) {
1436 updateBase(nb);
1437 return t;
1438 }
1439 }
1440 return null;
1441 }
1442
1443 // specialized execution methods
1444
1445 /**
1446 * Runs the given task, as well as remaining local tasks.
1447 */
1448 final void topLevelExec(ForkJoinTask<?> task, int fifo) {
1449 while (task != null) {
1450 task.doExec();
1451 task = nextLocalTask(fifo);
1452 }
1453 }
1454
1455 /**
1456 * Deep form of tryUnpush: Traverses from top and removes and
1457 * runs task if present.
1458 */
1459 final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1460 ForkJoinTask<?>[] a = array;
1461 int b = base, p = top, s = p - 1, d = p - b, cap;
1462 if (a != null && (cap = a.length) > 0) {
1463 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1464 long k; boolean taken;
1465 ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1466 a, k = slotOffset(i & m));
1467 if (t == null)
1468 break;
1469 if (t == task) {
1470 if (!internal && !tryLockPhase())
1471 break; // fail if locked
1561 }
1562 else if (!(t instanceof CompletableFuture
1563 .AsynchronousCompletionTask))
1564 break;
1565 if (blocker != null && blocker.isReleasable())
1566 break;
1567 if (base == b && t != null &&
1568 U.compareAndSetReference(a, k, t, null)) {
1569 updateBase(b + 1);
1570 t.doExec();
1571 }
1572 }
1573 }
1574
1575 // misc
1576
1577 /**
1578 * Cancels all local tasks. Called only by owner.
1579 */
1580 final void cancelTasks() {
1581 for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) {
1582 try {
1583 t.cancel(false);
1584 } catch (Throwable ignore) {
1585 }
1586 }
1587 }
1588
1589 /**
1590 * Returns true if internal and not known to be blocked.
1591 */
1592 final boolean isApparentlyUnblocked() {
1593 Thread wt; Thread.State s;
1594 return ((wt = owner) != null && (phase & IDLE) != 0 &&
1595 (s = wt.getState()) != Thread.State.BLOCKED &&
1596 s != Thread.State.WAITING &&
1597 s != Thread.State.TIMED_WAITING);
1598 }
1599
1600 static {
1601 U = Unsafe.getUnsafe();
1763 return false;
1764 }
1765
1766 /**
1767 * Provides a name for ForkJoinWorkerThread constructor.
1768 */
1769 final String nextWorkerThreadName() {
1770 String prefix = workerNamePrefix;
1771 long tid = incrementThreadIds() + 1L;
1772 if (prefix == null) // commonPool has no prefix
1773 prefix = "ForkJoinPool.commonPool-worker-";
1774 return prefix.concat(Long.toString(tid));
1775 }
1776
1777 /**
1778 * Finishes initializing and records internal queue.
1779 *
1780 * @param w caller's WorkQueue
1781 */
1782 final void registerWorker(WorkQueue w) {
1783 if (w != null && (runState & STOP) == 0L) {
1784 ThreadLocalRandom.localInit();
1785 int seed = w.stackPred = ThreadLocalRandom.getProbe();
1786 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1787 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1788 long stop = lockRunState() & STOP;
1789 try {
1790 WorkQueue[] qs; int n;
1791 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1792 for (int k = n, m = n - 1; ; id += 2) {
1793 if (qs[id &= m] == null)
1794 break;
1795 if ((k -= 2) <= 0) {
1796 id |= n;
1797 break;
1798 }
1799 }
1800 w.phase = id | phaseSeq; // now publishable
1801 if (id < n)
1802 qs[id] = w;
1803 else { // expand
1841 do {} while (c != (c = compareAndExchangeCtl(
1842 c, ((RC_MASK & (c - RC_UNIT)) |
1843 (TC_MASK & (c - TC_UNIT)) |
1844 (LMASK & c)))));
1845 }
1846 if (phase != 0 && w != null) { // remove index unless terminating
1847 long ns = w.nsteals & 0xffffffffL;
1848 if ((runState & STOP) == 0L) {
1849 WorkQueue[] qs; int n, i;
1850 if ((lockRunState() & STOP) == 0L &&
1851 (qs = queues) != null && (n = qs.length) > 0 &&
1852 qs[i = phase & SMASK & (n - 1)] == w) {
1853 qs[i] = null;
1854 stealCount += ns; // accumulate steals
1855 }
1856 unlockRunState();
1857 }
1858 }
1859 if ((tryTerminate(false, false) & STOP) == 0L &&
1860 phase != 0 && w != null && w.source != DROPPED) {
1861 signalWork(); // possibly replace
1862 w.cancelTasks(); // clean queue
1863 }
1864 if (ex != null)
1865 ForkJoinTask.rethrow(ex);
1866 }
1867
1868 /**
1869 * Releases an idle worker, or creates one if not enough exist.
1870 */
1871 final void signalWork() {
1872 int pc = parallelism;
1873 for (long c = ctl;;) {
1874 WorkQueue[] qs = queues;
1875 long ac = (c + RC_UNIT) & RC_MASK, nc;
1876 int sp = (int)c, i = sp & SMASK;
1877 if ((short)(c >>> RC_SHIFT) >= pc)
1878 break;
1879 if (qs == null)
1880 break;
1881 if (qs.length <= i)
1882 break;
1883 WorkQueue w = qs[i], v = null;
1884 if (sp == 0) {
1885 if ((short)(c >>> TC_SHIFT) >= pc)
1886 break;
1887 nc = ((c + TC_UNIT) & TC_MASK);
1888 }
1889 else if ((v = w) == null)
1890 break;
1891 else
1892 nc = (v.stackPred & LMASK) | (c & TC_MASK);
1893 if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
1894 if (v == null)
1895 createWorker();
1896 else {
1897 v.phase = sp;
1898 if (v.parking != 0)
1899 U.unpark(v.owner);
1900 }
1901 break;
1902 }
1903 }
1904 }
1905
1906 /**
1907 * Releases all waiting workers. Called only during shutdown.
1908 */
1909 private void releaseWaiters() {
1910 for (long c = ctl;;) {
1911 WorkQueue[] qs; WorkQueue v; int sp, i;
1912 if ((sp = (int)c) == 0 || (qs = queues) == null ||
1913 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
1957 else if ((e & SHUTDOWN) == 0)
1958 return 0;
1959 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1960 return 0;
1961 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1962 return 1; // enable termination
1963 else
1964 break; // restart
1965 }
1966 }
1967 }
1968
1969 /**
1970 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1971 * See above for explanation.
1972 *
1973 * @param w caller's WorkQueue (may be null on failed initialization)
1974 */
1975 final void runWorker(WorkQueue w) {
1976 if (w != null) {
1977 int phase = w.phase, r = w.stackPred; // seed from registerWorker
1978 int fifo = w.config & FIFO, nsteals = 0, src = -1;
1979 for (;;) {
1980 WorkQueue[] qs;
1981 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1982 if ((runState & STOP) != 0L || (qs = queues) == null)
1983 break;
1984 int n = qs.length, i = r, step = (r >>> 16) | 1;
1985 boolean rescan = false;
1986 scan: for (int l = n; l > 0; --l, i += step) { // scan queues
1987 int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1988 if ((q = qs[j = i & (n - 1)]) != null &&
1989 (a = q.array) != null && (cap = a.length) > 0) {
1990 for (int m = cap - 1, pb = -1, b = q.base;;) {
1991 ForkJoinTask<?> t; long k;
1992 t = (ForkJoinTask<?>)U.getReferenceAcquire(
1993 a, k = slotOffset(m & b));
1994 if (b != (b = q.base) || t == null ||
1995 !U.compareAndSetReference(a, k, t, null)) {
1996 if (a[b & m] == null) {
1997 if (rescan) // end of run
1998 break scan;
1999 if (a[(b + 1) & m] == null &&
2000 a[(b + 2) & m] == null) {
2001 break; // probably empty
2002 }
2003 if (pb == (pb = b)) { // track progress
2004 rescan = true; // stalled; reorder scan
2005 break scan;
2006 }
2007 }
2008 }
2009 else {
2010 boolean propagate;
2011 int nb = q.base = b + 1, prevSrc = src;
2012 w.nsteals = ++nsteals;
2013 w.source = src = j; // volatile
2014 rescan = true;
2015 int nh = t.noUserHelp();
2016 if (propagate =
2017 (prevSrc != src || nh != 0) && a[nb & m] != null)
2018 signalWork();
2019 w.topLevelExec(t, fifo);
2020 if ((b = q.base) != nb && !propagate)
2021 break scan; // reduce interference
2022 }
2023 }
2024 }
2025 }
2026 if (!rescan) {
2027 if (((phase = deactivate(w, phase)) & IDLE) != 0)
2028 break;
2029 src = -1; // re-enable propagation
2030 }
2031 }
2032 }
2033 }
2034
2035 /**
2036 * Deactivates and if necessary awaits signal or termination.
2037 *
2038 * @param w the worker
2039 * @param phase current phase
2040 * @return current phase, with IDLE set if worker should exit
2041 */
2042 private int deactivate(WorkQueue w, int phase) {
2043 if (w == null) // currently impossible
2044 return IDLE;
2045 int p = phase | IDLE, activePhase = phase + (IDLE << 1);
2046 long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
2047 int sp = w.stackPred = (int)pc; // set ctl stack link
2048 w.phase = p;
2049 if (!compareAndSetCtl(pc, qc)) // try to enqueue
2050 return w.phase = phase; // back out on possible signal
2051 int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
2052 if (((e = runState) & STOP) != 0L ||
2053 ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
2054 (qs = queues) == null || (n = qs.length) <= 0)
2055 return IDLE; // terminating
2056
2057 for (int prechecks = Math.min(ac, 2), // reactivation threshold
2058 k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
2059 WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
2060 if (w.phase == activePhase)
2061 return activePhase;
2062 if (--k < 0)
2063 return awaitWork(w, p); // block, drop, or exit
2064 if ((q = qs[k & (n - 1)]) == null)
2065 Thread.onSpinWait();
2066 else if ((a = q.array) != null && (cap = a.length) > 0 &&
2067 a[q.base & (cap - 1)] != null && --prechecks < 0 &&
2068 (int)(c = ctl) == activePhase &&
2069 compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
2070 return w.phase = activePhase; // reactivate
2071 }
2072 }
2073
2074 /**
2075 * Awaits signal or termination.
2076 *
2077 * @param w the work queue
2078 * @param p current phase (known to be idle)
2079 * @return current phase, with IDLE set if worker should exit
2080 */
2081 private int awaitWork(WorkQueue w, int p) {
2082 if (w != null) {
2083 ForkJoinWorkerThread t; long deadline;
2084 if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
2085 t.resetThreadLocals(); // clear before reactivate
2086 if ((ctl & RC_MASK) > 0L)
2087 deadline = 0L;
2088 else if ((deadline =
2089 (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
2090 System.currentTimeMillis()) == 0L)
2091 deadline = 1L; // avoid zero
2092 int activePhase = p + IDLE;
2093 if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
2094 LockSupport.setCurrentBlocker(this);
2095 w.parking = 1; // enable unpark
2096 while ((p = w.phase) != activePhase) {
2097 boolean trimmable = false; int trim;
2098 Thread.interrupted(); // clear status
2099 if ((runState & STOP) != 0L)
2100 break;
2101 if (deadline != 0L) {
2102 if ((trim = tryTrim(w, p, deadline)) > 0)
2103 break;
2104 else if (trim < 0)
2105 deadline = 0L;
2106 else
2107 trimmable = true;
2108 }
2109 U.park(trimmable, deadline);
2110 }
2111 w.parking = 0;
2112 LockSupport.setCurrentBlocker(null);
2113 }
2114 }
2115 return p;
2116 }
2117
2118 /**
2119 * Tries to remove and deregister worker after timeout, and release
2120 * another to do the same.
2121 * @return > 0: trimmed, < 0 : not trimmable, else 0
2122 */
2123 private int tryTrim(WorkQueue w, int phase, long deadline) {
2124 long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2125 if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
2126 stat = -1; // no longer ctl top
2127 else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2128 stat = 0; // spurious wakeup
2129 else if (!compareAndSetCtl(
2130 c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2131 (TC_MASK & (c - TC_UNIT)))))
2132 stat = -1; // lost race to signaller
2133 else {
2134 stat = 1;
2135 w.source = DROPPED;
2136 w.phase = activePhase;
2137 if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2138 vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2139 compareAndSetCtl( // try to wake up next waiter
2140 nc, ((UMASK & (nc + RC_UNIT)) |
2141 (nc & TC_MASK) | (v.stackPred & LMASK)))) {
2142 v.source = INVALID_ID; // enable cascaded timeouts
2143 v.phase = vp;
2144 U.unpark(v.owner);
2145 }
2146 }
2147 return stat;
2544 else
2545 return 0;
2546 }
2547
2548 /**
2549 * Gets and removes a local or stolen task for the given worker.
2550 *
2551 * @return a task, if available
2552 */
2553 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2554 ForkJoinTask<?> t;
2555 if (w == null || (t = w.nextLocalTask()) == null)
2556 t = pollScan(false);
2557 return t;
2558 }
2559
2560 // External operations
2561
2562 /**
2563 * Finds and locks a WorkQueue for an external submitter, or
2564 * throws RejectedExecutionException if shutdown or terminating.
2565 * @param r current ThreadLocalRandom.getProbe() value
2566 * @param rejectOnShutdown true if RejectedExecutionException
2567 * should be thrown when shutdown (else only if terminating)
2568 */
2569 private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
2570 int reuse; // nonzero if prefer create
2571 if ((reuse = r) == 0) {
2572 ThreadLocalRandom.localInit(); // initialize caller's probe
2573 r = ThreadLocalRandom.getProbe();
2574 }
2575 for (int probes = 0; ; ++probes) {
2576 int n, i, id; WorkQueue[] qs; WorkQueue q;
2577 if ((qs = queues) == null)
2578 break;
2579 if ((n = qs.length) <= 0)
2580 break;
2581 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2582 WorkQueue w = new WorkQueue(null, id, 0, false);
2583 w.phase = id;
2584 boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
2585 rejectOnShutdown);
2586 if (!reject && queues == qs && qs[i] == null)
2587 q = qs[i] = w; // else lost race to install
2588 unlockRunState();
2589 if (q != null)
2590 return q;
2591 if (reject)
2592 break;
2593 reuse = 0;
2594 }
2595 if (reuse == 0 || !q.tryLockPhase()) { // move index
2596 if (reuse == 0) {
2597 if (probes >= n >> 1)
2598 reuse = r; // stop prefering free slot
2599 }
2600 else if (q != null)
2601 reuse = 0; // probe on collision
2602 r = ThreadLocalRandom.advanceProbe(r);
2603 }
2604 else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2605 q.unlockPhase(); // check while q lock held
2606 break;
2607 }
2608 else
2609 return q;
2610 }
2611 throw new RejectedExecutionException();
2612 }
2613
2614 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2615 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2616 if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2617 (wt = (ForkJoinWorkerThread)t).pool == this) {
2618 internal = true;
2619 q = wt.workQueue;
2620 }
2621 else { // find and lock queue
2622 internal = false;
2623 q = submissionQueue(ThreadLocalRandom.getProbe(), true);
2624 }
2625 q.push(task, signalIfEmpty ? this : null, internal);
2626 return task;
2627 }
2628
2629 /**
2630 * Returns queue for an external submission, bypassing call to
2631 * submissionQueue if already established and unlocked.
2632 */
2633 final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2634 WorkQueue[] qs; WorkQueue q; int n;
2635 int r = ThreadLocalRandom.getProbe();
2636 return (((qs = queues) != null && (n = qs.length) > 0 &&
2637 (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
2638 q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
2639 }
2640
2641 /**
2642 * Returns queue for an external thread, if one exists that has
2643 * possibly ever submitted to the given pool (nonzero probe), or
2644 * null if none.
2645 */
2646 static WorkQueue externalQueue(ForkJoinPool p) {
2647 WorkQueue[] qs; int n;
2648 int r = ThreadLocalRandom.getProbe();
2649 return (p != null && (qs = p.queues) != null &&
2650 (n = qs.length) > 0 && r != 0) ?
2651 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2652 }
2653
2654 /**
2655 * Returns external queue for common pool.
2656 */
2657 static WorkQueue commonQueue() {
2658 return externalQueue(common);
2659 }
2660
|
543 * WorkQueue field "phase" encodes the queue array id in lower
544 * bits, and otherwise acts similarly to the pool runState field:
545 * The "IDLE" bit is clear while active (either a released worker
546 * or a locked external queue), with other bits serving as a
547 * version counter to distinguish changes across multiple reads.
548 * Note that phase field updates lag queue CAS releases; seeing a
549 * non-idle phase does not guarantee that the worker is available
550 * (and so is never checked in this way).
551 *
552 * The ctl field also serves as the basis for memory
553 * synchronization surrounding activation. This uses a more
554 * efficient version of a Dekker-like rule that task producers and
555 * consumers sync with each other by both writing/CASing ctl (even
556 * if to its current value). However, rather than CASing ctl to
557 * its current value in the common case where no action is
558 * required, we reduce write contention by ensuring that
559 * signalWork invocations are prefaced with a fully fenced memory
560 * access (which is usually needed anyway).
561 *
562 * Signalling. Signals (in signalWork) cause new or reactivated
563 * workers to scan for tasks. SignalWork is invoked in two cases:
564 * (1) When a task is pushed onto an empty queue, and (2) When a
565 * worker takes a top-level task from a queue that has additional
566 * tasks. Together, these suffice in O(log(#threads)) steps to
567 * fully activate with at least enough workers, and ideally no
568 * more than required. This ideal is unobtainable: Callers do not
569 * know whether another worker will finish its current task and
570 * poll for others without need of a signal (which is otherwise an
571 * advantage of work-stealing vs other schemes), and also must
572 * conservatively estimate the triggering conditions of emptiness
573 * or non-emptiness; all of which usually cause more activations
574 * than necessary (see below). (Method signalWork is also used as
575 * failsafe in case of Thread failures in deregisterWorker, to
576 * activate or create a new worker to replace them).
577 *
578 * Top-Level scheduling
579 * ====================
580 *
581 * Scanning. Method runWorker performs top-level scanning for (and
582 * execution of) tasks by polling a pseudo-random permutation of
583 * the array (by starting at a given index, and using a constant
584 * cyclically exhaustive stride.) It uses the same basic polling
585 * method as WorkQueue.poll(), but restarts with a different
586 * permutation on each rescan. The pseudorandom generator need
587 * not have high-quality statistical properties in the long
588 * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
589 * from ThreadLocalRandom probes, which are cheap and suffice.
590 *
591 * Deactivation. When no tasks are found by a worker in runWorker,
592 * it invokes awaitWork, that first deactivates (to an IDLE
593 * phase). Avoiding missed signals during deactivation requires a
594 * (conservative) rescan, reactivating if there may be tasks to
595 * poll. Because idle workers are often not yet blocked (parked),
596 * we use a WorkQueue field to advertise that a waiter actually
597 * needs unparking upon signal.
598 *
599 * When tasks are constructed as (recursive) DAGs, top-level
600 * scanning is usually infrequent, and doesn't encounter most
601 * of the following problems addressed by runWorker and awaitWork:
602 *
603 * Locality. Polls are organized into "runs", continuing until
604 * empty or contended, while also minimizing interference by
605 * postponing bookeeping to ends of runs. This may reduce
606 * fairness.
607 *
608 * Contention. When many workers try to poll few queues, they
609 * often collide, generating CAS failures and disrupting locality
610 * of workers already running their tasks. This also leads to
611 * stalls when tasks cannot be taken because other workers have
612 * not finished poll operations, which is detected by reading
613 * ahead in queue arrays. In both cases, workers restart scans in a
614 * way that approximates randomized backoff.
615 *
616 * Oversignalling. When many short top-level tasks are present in
617 * a small number of queues, the above signalling strategy may
618 * activate many more workers than needed, worsening locality and
619 * contention problems, while also generating more global
620 * contention (field ctl is CASed on every activation and
621 * deactivation). We filter out (both in runWorker and
622 * signalWork) attempted signals that are surely not needed
623 * because the signalled tasks are already taken.
624 *
625 * Shutdown and Quiescence
626 * =======================
627 *
628 * Quiescence. Workers scan looking for work, giving up when they
629 * don't find any, without being sure that none are available.
630 * However, some required functionality relies on consensus about
631 * quiescence (also termination, discussed below). The count
632 * fields in ctl allow accurate discovery of states in which all
633 * workers are idle. However, because external (asynchronous)
634 * submitters are not part of this vote, these mechanisms
635 * themselves do not guarantee that the pool is in a quiescent
636 * state with respect to methods isQuiescent, shutdown (which
637 * begins termination when quiescent), helpQuiesce, and indirectly
638 * others including tryCompensate. Method quiescent() is used in
639 * all of these contexts. It provides checks that all workers are
640 * idle and there are no submissions that they could poll if they
641 * were not idle, retrying on inconsistent reads of queues and
642 * using the runState seqLock to retry on queue array updates.
643 * (It also reports quiescence if the pool is terminating.) A true
644 * report means only that there was a moment at which quiescence
645 * held. False negatives are inevitable (for example when queues
646 * indices lag updates, as described above), which is accommodated
856 * ====================
857 *
858 * Regular ForkJoinTasks manage task cancellation (method cancel)
859 * independently from the interrupt status of threads running
860 * tasks. Interrupts are issued internally only while
861 * terminating, to wake up workers and cancel queued tasks. By
862 * default, interrupts are cleared only when necessary to ensure
863 * that calls to LockSupport.park do not loop indefinitely (park
864 * returns immediately if the current thread is interrupted).
865 *
866 * To comply with ExecutorService specs, we use subclasses of
867 * abstract class InterruptibleTask for tasks that require
868 * stronger interruption and cancellation guarantees. External
869 * submitters never run these tasks, even if in the common pool
870 * (as indicated by ForkJoinTask.noUserHelp status bit).
871 * InterruptibleTasks include a "runner" field (implemented
872 * similarly to FutureTask) to support cancel(true). Upon pool
873 * shutdown, runners are interrupted so they can cancel. Since
874 * external joining callers never run these tasks, they must await
875 * cancellation by others, which can occur along several different
876 * paths.
877 *
878 * Across these APIs, rules for reporting exceptions for tasks
879 * with results accessed via join() differ from those via get(),
880 * which differ from those invoked using pool submit methods by
881 * non-workers (which comply with Future.get() specs). Internal
882 * usages of ForkJoinTasks ignore interrupt status when executing
883 * or awaiting completion. Otherwise, reporting task results or
884 * exceptions is preferred to throwing InterruptedExceptions,
885 * which are in turn preferred to timeouts. Similarly, completion
886 * status is preferred to reporting cancellation. Cancellation is
887 * reported as an unchecked exception by join(), and by worker
888 * calls to get(), but is otherwise wrapped in a (checked)
889 * ExecutionException.
890 *
891 * Worker Threads cannot be VirtualThreads, as enforced by
892 * requiring ForkJoinWorkerThreads in factories. There are
893 * several constructions relying on this. However as of this
894 * writing, virtual thread bodies are by default run as some form
895 * of InterruptibleTask.
896 *
923 * the placement of their fields. Caches misses and contention due
924 * to false-sharing have been observed to slow down some programs
925 * by more than a factor of four. Effects may vary across initial
926 * memory configuarations, applications, and different garbage
927 * collectors and GC settings, so there is no perfect solution.
928 * Too much isolation may generate more cache misses in common
929 * cases (because some fields snd slots are usually read at the
930 * same time). The @Contended annotation provides only rough
931 * control (for good reason). Similarly for relying on fields
932 * being placed in size-sorted declaration order.
933 *
934 * We isolate the ForkJoinPool.ctl field that otherwise causes the
935 * most false-sharing misses with respect to other fields. Also,
936 * ForkJoinPool fields are ordered such that fields less prone to
937 * contention effects are first, offsetting those that otherwise
938 * would be, while also reducing total footprint vs using
939 * multiple @Contended regions, which tends to slow down
940 * less-contended applications. To help arrange this, some
941 * non-reference fields are declared as "long" even when ints or
942 * shorts would suffice. For class WorkQueue, an
943 * embedded @Contended isolates the very busy top index, and
944 * another segregates status and bookkeeping fields written
945 * (mostly) by owners, that otherwise interfere with reading
946 * array, top, and base fields. There are other variables commonly
947 * contributing to false-sharing-related performance issues
948 * (including fields of class Thread), but we can't do much about
949 * this except try to minimize access.
950 *
951 * Initial sizing and resizing of WorkQueue arrays is an even more
952 * delicate tradeoff because the best strategy systematically
953 * varies across garbage collectors. Small arrays are better for
954 * locality and reduce GC scan time, but large arrays reduce both
955 * direct false-sharing and indirect cases due to GC bookkeeping
956 * (cardmarks etc), and reduce the number of resizes, which are
957 * not especially fast because they require atomic transfers.
958 * Currently, arrays are initialized to be just large enough to
959 * avoid resizing in most tree-structured tasks, but grow rapidly
960 * until large. (Maintenance note: any changes in fields, queues,
961 * or their uses, or JVM layout policies, must be accompanied by
962 * re-evaluation of these placement and sizing decisions.)
963 *
964 * Style notes
965 * ===========
966 *
967 * Memory ordering relies mainly on atomic operations (CAS,
968 * getAndSet, getAndAdd) along with moded accesses. These use
969 * jdk-internal Unsafe for atomics and special memory modes,
970 * rather than VarHandles, to avoid initialization dependencies in
971 * other jdk components that require early parallelism. This can
972 * be awkward and ugly, but also reflects the need to control
973 * outcomes across the unusual cases that arise in very racy code
974 * with very few invariants. All atomic task slot updates use
975 * Unsafe operations requiring offset positions, not indices, as
976 * computed by method slotOffset. All fields are read into locals
977 * before use, and null-checked if they are references, even if
978 * they can never be null under current usages. Usually,
979 * computations (held in local variables) are defined as soon as
980 * logically enabled, sometimes to convince compilers that they
981 * may be performed despite memory ordering constraints. Array
982 * accesses using masked indices include checks (that are always
1025 */
1026 static final long DEFAULT_KEEPALIVE = 60_000L;
1027
1028 /**
1029 * Undershoot tolerance for idle timeouts, also serving as the
1030 * minimum allowed timeout value.
1031 */
1032 static final long TIMEOUT_SLOP = 20L;
1033
1034 /**
1035 * The default value for common pool maxSpares. Overridable using
1036 * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1037 * system property. The default value is far in excess of normal
1038 * requirements, but also far short of maximum capacity and typical OS
1039 * thread limits, so allows JVMs to catch misuse/abuse before
1040 * running out of resources needed to do so.
1041 */
1042 static final int DEFAULT_COMMON_MAX_SPARES = 256;
1043
1044 /**
1045 * Initial capacity of work-stealing queue array.
1046 * Must be a power of two, at least 2. See above.
1047 */
1048 static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1049
1050 // conversions among short, int, long
1051 static final int SMASK = 0xffff; // (unsigned) short bits
1052 static final long LMASK = 0xffffffffL; // lower 32 bits of long
1053 static final long UMASK = ~LMASK; // upper 32 bits
1054
1055 // masks and sentinels for queue indices
1056 static final int MAX_CAP = 0x7fff; // max # workers
1057 static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id
1058 static final int INVALID_ID = 0x4000; // unused external queue id
1059
1060 // pool.runState bits
1061 static final long STOP = 1L << 0; // terminating
1062 static final long SHUTDOWN = 1L << 1; // terminate when quiescent
1063 static final long CLEANED = 1L << 2; // stopped and queues cleared
1064 static final long TERMINATED = 1L << 3; // only set if STOP also set
1065 static final long RS_LOCK = 1L << 4; // lowest seqlock bit
1066
1067 // spin/sleep limits for runState locking and elsewhere
1068 static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
1069 static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
1161 static final class DefaultForkJoinWorkerThreadFactory
1162 implements ForkJoinWorkerThreadFactory {
1163 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1164 return ((pool.workerNamePrefix == null) ? // is commonPool
1165 new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1166 new ForkJoinWorkerThread(null, pool, true, false));
1167 }
1168 }
1169
1170 /**
1171 * Queues supporting work-stealing as well as external task
1172 * submission. See above for descriptions and algorithms.
1173 */
1174 static final class WorkQueue {
1175 // fields declared in order of their likely layout on most VMs
1176 final ForkJoinWorkerThread owner; // null if shared
1177 ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
1178 int base; // index of next slot for poll
1179 final int config; // mode bits
1180
1181 @jdk.internal.vm.annotation.Contended("t") // segregate
1182 int top; // index of next slot for push
1183
1184 // fields otherwise causing more unnecessary false-sharing cache misses
1185 @jdk.internal.vm.annotation.Contended("w")
1186 volatile int phase; // versioned active status
1187 @jdk.internal.vm.annotation.Contended("w")
1188 int stackPred; // pool stack (ctl) predecessor link
1189 @jdk.internal.vm.annotation.Contended("w")
1190 volatile int parking; // nonzero if parked in awaitWork
1191 @jdk.internal.vm.annotation.Contended("w")
1192 volatile int source; // source queue id (or DROPPED)
1193 @jdk.internal.vm.annotation.Contended("w")
1194 int nsteals; // number of steals from other queues
1195
1196 // Support for atomic operations
1197 private static final Unsafe U;
1198 private static final long PHASE;
1199 private static final long BASE;
1200 private static final long TOP;
1201 private static final long ARRAY;
1202
1203 final void updateBase(int v) {
1204 U.putIntVolatile(this, BASE, v);
1205 }
1206 final void updateTop(int v) {
1207 U.putIntOpaque(this, TOP, v);
1208 }
1209 final void updateArray(ForkJoinTask<?>[] a) {
1210 U.getAndSetReference(this, ARRAY, a);
1211 }
1212 final void unlockPhase() {
1213 U.getAndAddInt(this, PHASE, IDLE);
1214 }
1215 final boolean tryLockPhase() { // seqlock acquire
1216 int p;
1217 return (((p = phase) & IDLE) != 0 &&
1218 U.compareAndSetInt(this, PHASE, p, p + IDLE));
1219 }
1220
1221 /**
1222 * Constructor. For internal queues, most fields are initialized
1223 * upon thread start in pool.registerWorker.
1224 */
1225 WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1226 boolean clearThreadLocals) {
1227 this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1228 if ((this.owner = owner) == null) {
1229 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1230 phase = id | IDLE;
1231 }
1232 }
1233
1234 /**
1235 * Returns an exportable index (used by ForkJoinWorkerThread).
1236 */
1237 final int getPoolIndex() {
1238 return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1239 }
1240
1241 /**
1242 * Returns the approximate number of tasks in the queue.
1243 */
1244 final int queueSize() {
1245 int unused = phase; // for ordering effect
1246 return Math.max(top - base, 0); // ignore transient negative
1247 }
1248
1249 /**
1250 * Pushes a task. Called only by owner or if already locked
1251 *
1252 * @param task the task; no-op if null
1253 * @param pool the pool to signal if was previously empty, else null
1254 * @param internal if caller owns this queue
1255 * @throws RejectedExecutionException if array could not be resized
1256 */
1257 final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1258 int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1259 if ((a = array) != null && (cap = a.length) > 0) { // else disabled
1260 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1261 top = s + 1;
1262 long pos = slotOffset(m & s);
1263 if (!internal)
1264 U.putReference(a, pos, task); // inside lock
1265 else
1266 U.getAndSetReference(a, pos, task); // fully fenced
1267 if (room == 0 && (a = growArray(a, cap, s)) != null)
1268 m = a.length - 1; // resize
1269 }
1270 if (!internal)
1271 unlockPhase();
1272 if (room < 0)
1273 throw new RejectedExecutionException("Queue capacity exceeded");
1274 if (pool != null && a != null &&
1275 U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null)
1276 pool.signalWork(a, m & s); // may have appeared empty
1277 }
1278 }
1279
1280 /**
1281 * Resizes the queue array unless out of memory.
1282 * @param a old array
1283 * @param cap old array capacity
1284 * @param s current top
1285 * @return new array, or null on failure
1286 */
1287 private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
1288 int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
1289 ForkJoinTask<?>[] newArray = null;
1290 if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1291 try {
1292 newArray = new ForkJoinTask<?>[newCap];
1293 } catch (OutOfMemoryError ex) {
1294 }
1295 if (newArray != null) { // else throw on next push
1296 int mask = cap - 1, newMask = newCap - 1;
1297 for (int k = s, j = cap; j > 0; --j, --k) {
1298 ForkJoinTask<?> u; // poll old, push to new
1299 if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1300 a, slotOffset(k & mask), null)) == null)
1301 break; // lost to pollers
1302 newArray[k & newMask] = u;
1303 }
1304 updateArray(newArray); // fully fenced
1305 }
1306 }
1307 return newArray;
1308 }
1309
1310 /**
1311 * Takes next task, if one exists, in lifo order.
1312 */
1313 private ForkJoinTask<?> localPop() {
1314 ForkJoinTask<?> t = null;
1315 int s = top - 1, cap; long k; ForkJoinTask<?>[] a;
1316 if ((a = array) != null && (cap = a.length) > 0 &&
1317 U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
1318 (t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
1319 updateTop(s);
1320 return t;
1321 }
1322
1323 /**
1324 * Takes next task, if one exists, in fifo order.
1325 */
1326 private ForkJoinTask<?> localPoll() {
1327 ForkJoinTask<?> t = null;
1328 int p = top, cap; ForkJoinTask<?>[] a;
1329 if ((a = array) != null && (cap = a.length) > 0) {
1330 for (int b = base; p - b > 0; ) {
1331 int nb = b + 1;
1332 long k = slotOffset((cap - 1) & b);
1333 if (U.getReference(a, k) == null) {
1334 if (nb == p)
1335 break; // else base is lagging
1336 while (b == (b = U.getIntAcquire(this, BASE)))
1337 Thread.onSpinWait(); // spin to reduce memory traffic
1338 }
1339 else if ((t = (ForkJoinTask<?>)
1340 U.getAndSetReference(a, k, null)) != null) {
1341 updateBase(nb);
1342 break;
1343 }
1344 else
1345 b = base;
1346 }
1347 }
1348 return t;
1349 }
1350
1351 /**
1352 * Takes next task, if one exists, using configured mode.
1353 */
1354 final ForkJoinTask<?> nextLocalTask() {
1355 return (config & FIFO) == 0 ? localPop() : localPoll();
1356 }
1357
1358 /**
1359 * Pops the given task only if it is at the current top.
1360 * @param task the task. Caller must ensure non-null.
1361 * @param internal if caller owns this queue
1362 */
1363 final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1364 boolean taken = false;
1365 ForkJoinTask<?>[] a = array;
1366 int p = top, s = p - 1, cap; long k;
1367 if (a != null && (cap = a.length) > 0 &&
1368 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1369 (internal || tryLockPhase())) {
1370 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1371 taken = true;
1372 updateTop(s);
1373 }
1374 if (!internal)
1375 unlockPhase();
1416 break; // empty
1417 if (pb == b)
1418 Thread.onSpinWait(); // stalled
1419 }
1420 else if (U.compareAndSetReference(a, k, t, null)) {
1421 updateBase(nb);
1422 return t;
1423 }
1424 }
1425 return null;
1426 }
1427
1428 // specialized execution methods
1429
1430 /**
1431 * Runs the given task, as well as remaining local tasks.
1432 */
1433 final void topLevelExec(ForkJoinTask<?> task, int fifo) {
1434 while (task != null) {
1435 task.doExec();
1436 task = (fifo == 0) ? localPop() : localPoll();
1437 }
1438 }
1439
1440 /**
1441 * Deep form of tryUnpush: Traverses from top and removes and
1442 * runs task if present.
1443 */
1444 final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1445 ForkJoinTask<?>[] a = array;
1446 int b = base, p = top, s = p - 1, d = p - b, cap;
1447 if (a != null && (cap = a.length) > 0) {
1448 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1449 long k; boolean taken;
1450 ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1451 a, k = slotOffset(i & m));
1452 if (t == null)
1453 break;
1454 if (t == task) {
1455 if (!internal && !tryLockPhase())
1456 break; // fail if locked
1546 }
1547 else if (!(t instanceof CompletableFuture
1548 .AsynchronousCompletionTask))
1549 break;
1550 if (blocker != null && blocker.isReleasable())
1551 break;
1552 if (base == b && t != null &&
1553 U.compareAndSetReference(a, k, t, null)) {
1554 updateBase(b + 1);
1555 t.doExec();
1556 }
1557 }
1558 }
1559
1560 // misc
1561
1562 /**
1563 * Cancels all local tasks. Called only by owner.
1564 */
1565 final void cancelTasks() {
1566 for (ForkJoinTask<?> t; (t = localPop()) != null; ) {
1567 try {
1568 t.cancel(false);
1569 } catch (Throwable ignore) {
1570 }
1571 }
1572 }
1573
1574 /**
1575 * Returns true if internal and not known to be blocked.
1576 */
1577 final boolean isApparentlyUnblocked() {
1578 Thread wt; Thread.State s;
1579 return ((wt = owner) != null && (phase & IDLE) != 0 &&
1580 (s = wt.getState()) != Thread.State.BLOCKED &&
1581 s != Thread.State.WAITING &&
1582 s != Thread.State.TIMED_WAITING);
1583 }
1584
1585 static {
1586 U = Unsafe.getUnsafe();
1748 return false;
1749 }
1750
1751 /**
1752 * Provides a name for ForkJoinWorkerThread constructor.
1753 */
1754 final String nextWorkerThreadName() {
1755 String prefix = workerNamePrefix;
1756 long tid = incrementThreadIds() + 1L;
1757 if (prefix == null) // commonPool has no prefix
1758 prefix = "ForkJoinPool.commonPool-worker-";
1759 return prefix.concat(Long.toString(tid));
1760 }
1761
1762 /**
1763 * Finishes initializing and records internal queue.
1764 *
1765 * @param w caller's WorkQueue
1766 */
1767 final void registerWorker(WorkQueue w) {
1768 if (w != null) {
1769 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1770 ThreadLocalRandom.localInit();
1771 int seed = w.stackPred = ThreadLocalRandom.getProbe();
1772 int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1773 int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1774 long stop = lockRunState() & STOP;
1775 try {
1776 WorkQueue[] qs; int n;
1777 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1778 for (int k = n, m = n - 1; ; id += 2) {
1779 if (qs[id &= m] == null)
1780 break;
1781 if ((k -= 2) <= 0) {
1782 id |= n;
1783 break;
1784 }
1785 }
1786 w.phase = id | phaseSeq; // now publishable
1787 if (id < n)
1788 qs[id] = w;
1789 else { // expand
1827 do {} while (c != (c = compareAndExchangeCtl(
1828 c, ((RC_MASK & (c - RC_UNIT)) |
1829 (TC_MASK & (c - TC_UNIT)) |
1830 (LMASK & c)))));
1831 }
1832 if (phase != 0 && w != null) { // remove index unless terminating
1833 long ns = w.nsteals & 0xffffffffL;
1834 if ((runState & STOP) == 0L) {
1835 WorkQueue[] qs; int n, i;
1836 if ((lockRunState() & STOP) == 0L &&
1837 (qs = queues) != null && (n = qs.length) > 0 &&
1838 qs[i = phase & SMASK & (n - 1)] == w) {
1839 qs[i] = null;
1840 stealCount += ns; // accumulate steals
1841 }
1842 unlockRunState();
1843 }
1844 }
1845 if ((tryTerminate(false, false) & STOP) == 0L &&
1846 phase != 0 && w != null && w.source != DROPPED) {
1847 w.cancelTasks(); // clean queue
1848 signalWork(null, 0); // possibly replace
1849 }
1850 if (ex != null)
1851 ForkJoinTask.rethrow(ex);
1852 }
1853
1854 /**
1855 * Releases an idle worker, or creates one if not enough exist,
1856 * giving up if array a is nonnull and task at a[k] already taken.
1857 */
1858 final void signalWork(ForkJoinTask<?>[] a, int k) {
1859 int pc = parallelism;
1860 for (long c = ctl;;) {
1861 WorkQueue[] qs = queues;
1862 long ac = (c + RC_UNIT) & RC_MASK, nc;
1863 int sp = (int)c, i = sp & SMASK;
1864 if ((short)(c >>> RC_SHIFT) >= pc)
1865 break;
1866 if (qs == null)
1867 break;
1868 if (qs.length <= i)
1869 break;
1870 WorkQueue w = qs[i], v = null;
1871 if (sp == 0) {
1872 if ((short)(c >>> TC_SHIFT) >= pc)
1873 break;
1874 nc = ((c + TC_UNIT) & TC_MASK) | ac;
1875 }
1876 else if ((v = w) == null)
1877 break;
1878 else
1879 nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
1880 if (a != null && k < a.length && k >= 0 && a[k] == null)
1881 break;
1882 if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
1883 if (v == null)
1884 createWorker();
1885 else {
1886 v.phase = sp;
1887 if (v.parking != 0)
1888 U.unpark(v.owner);
1889 }
1890 break;
1891 }
1892 }
1893 }
1894
1895 /**
1896 * Releases all waiting workers. Called only during shutdown.
1897 */
1898 private void releaseWaiters() {
1899 for (long c = ctl;;) {
1900 WorkQueue[] qs; WorkQueue v; int sp, i;
1901 if ((sp = (int)c) == 0 || (qs = queues) == null ||
1902 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
1946 else if ((e & SHUTDOWN) == 0)
1947 return 0;
1948 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1949 return 0;
1950 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1951 return 1; // enable termination
1952 else
1953 break; // restart
1954 }
1955 }
1956 }
1957
1958 /**
1959 * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1960 * See above for explanation.
1961 *
1962 * @param w caller's WorkQueue (may be null on failed initialization)
1963 */
1964 final void runWorker(WorkQueue w) {
1965 if (w != null) {
1966 int r = w.stackPred; // seed from registerWorker
1967 int fifo = (int)config & FIFO;
1968 int nsteals = 0; // shadow w.nsteals
1969 boolean rescan = true;
1970 WorkQueue[] qs; int n;
1971 while ((rescan || deactivate(w) == 0) && (runState & STOP) == 0L &&
1972 (qs = queues) != null && (n = qs.length) > 0) {
1973 rescan = false;
1974 int i = r, step = (r >>> 16) | 1;
1975 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1976 scan: for (int j = n << 1; j != 0; --j, i += step) { // 2 sweeps
1977 WorkQueue q; int qid;
1978 if ((q = qs[qid = i & (n - 1)]) != null) {
1979 for (;;) { // poll queue q
1980 ForkJoinTask<?>[] a; int cap, b, m, nb, nk;
1981 if ((a = q.array) == null || (cap = a.length) <= 0)
1982 break;
1983 long bp = slotOffset((m = cap - 1) & (b = q.base));
1984 long np = slotOffset(nk = m & (nb = b + 1));
1985 ForkJoinTask<?> t = (ForkJoinTask<?>)
1986 U.getReferenceAcquire(a, bp);
1987 if (q.array != a || q.base != b ||
1988 U.getReference(a, bp) != t)
1989 continue; // inconsistent
1990 if (t == null) {
1991 if (rescan) { // end of run
1992 w.nsteals = nsteals;
1993 break scan;
1994 }
1995 if (U.getReference(a, np) != null) {
1996 rescan = true; // stalled; reorder scan
1997 break scan;
1998 }
1999 break; // probably empty
2000 }
2001 if (U.compareAndSetReference(a, bp, t, null)) {
2002 q.base = nb;
2003 Object nt = U.getReferenceAcquire(a, np);
2004 if (!rescan) { // begin run
2005 rescan = true;
2006 w.source = qid;
2007 }
2008 ++nsteals;
2009 if (nt != null && // confirm a[nk]
2010 U.getReference(a, np) == nt)
2011 signalWork(a, nk); // propagate
2012 w.topLevelExec(t, fifo); // run t & its subtasks
2013 }
2014 }
2015 }
2016 }
2017 }
2018 }
2019 }
2020
2021 /**
2022 * Deactivates and awaits signal or termination.
2023 *
2024 * @param w the work queue
2025 * @return zero if now active
2026 */
2027 private int deactivate(WorkQueue w) {
2028 int idle = 1;
2029 if (w != null) { // always true; hoist checks
2030 int inactive = w.phase |= IDLE; // set status
2031 int activePhase = inactive + IDLE; // phase value when reactivated
2032 long ap = activePhase & LMASK, pc = ctl, qc;
2033 do { // enqueue
2034 qc = ap | ((pc - RC_UNIT) & UMASK);
2035 w.stackPred = (int)pc; // set ctl stack link
2036 } while (pc != (pc = compareAndExchangeCtl(pc, qc)));
2037
2038 WorkQueue[] qs; int n; long e;
2039 if (((e = runState) & STOP) == 0 && // quiescence checks
2040 ((e & SHUTDOWN) == 0L || (qc & RC_MASK) > 0L || quiescent() <= 0) &&
2041 (qs = queues) != null && (n = qs.length) > 1) {
2042 long psp = pc & LMASK; // ctl predecessor prefix
2043 for (int i = 1; i < n; ++i) { // scan; stagger origins
2044 WorkQueue q; long c; // missed signal check
2045 if ((q = qs[(activePhase + i) & (n - 1)]) != null &&
2046 q.top - q.base > 0) {
2047 if ((idle = w.phase - activePhase) != 0 &&
2048 (int)(c = ctl) == activePhase &&
2049 compareAndSetCtl(c, psp | ((c + RC_UNIT) & UMASK))) {
2050 w.phase = activePhase;
2051 idle = 0; // reactivated
2052 } // else ineligible or lost race
2053 break;
2054 }
2055 }
2056 if (idle != 0 && (idle = w.phase - activePhase) != 0)
2057 idle = awaitWork(w, activePhase, n);
2058 }
2059 }
2060 return idle;
2061 }
2062
2063 /**
2064 * Awaits signal or termination.
2065 *
2066 * @param w the work queue
2067 * @param activePhase w's next active phase
2068 * @param qsize current size of queues array
2069 * @return zero if now active
2070 */
2071 private int awaitWork(WorkQueue w, int activePhase, int qsize) {
2072 int idle = 1;
2073 int spins = qsize | (qsize - 1); // approx traversal cost
2074 if (w != null) { // always true; hoist checks
2075 boolean trimmable; long deadline, c;
2076 long trimTime = (w.source == INVALID_ID) ? TIMEOUT_SLOP : keepAlive;
2077 if ((w.config & CLEAR_TLS) != 0 && // instanceof check always true
2078 Thread.currentThread() instanceof ForkJoinWorkerThread f)
2079 f.resetThreadLocals(); // clear while accessing thread state
2080 LockSupport.setCurrentBlocker(this);
2081 if (trimmable = (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase))
2082 deadline = trimTime + System.currentTimeMillis();
2083 else
2084 deadline = 0L;
2085 for (;;) {
2086 int s = spins, trim;
2087 Thread.interrupted(); // clear status
2088 if ((runState & STOP) != 0L)
2089 break;
2090 while ((idle = w.phase - activePhase) != 0 && --s != 0)
2091 Thread.onSpinWait(); // spin before blocking
2092 if (idle == 0)
2093 break;
2094 if (trimmable &&
2095 (trim = tryTrim(w, activePhase, deadline)) != 0) {
2096 if (trim > 0)
2097 break;
2098 trimmable = false;
2099 deadline = 0L;
2100 }
2101 w.parking = 1; // enable unpark and recheck
2102 if ((idle = w.phase - activePhase) != 0)
2103 U.park(trimmable, deadline);
2104 w.parking = 0; // close unpark window
2105 if (idle == 0 || (idle = w.phase - activePhase) == 0)
2106 break;
2107 }
2108 LockSupport.setCurrentBlocker(null);
2109 }
2110 return idle;
2111 }
2112
2113 /**
2114 * Tries to remove and deregister worker after timeout, and release
2115 * another to do the same.
2116 * @return > 0: trimmed, < 0 : not trimmable, else 0
2117 */
2118 private int tryTrim(WorkQueue w, int activePhase, long deadline) {
2119 long c, nc; int stat, vp, i; WorkQueue[] vs; WorkQueue v;
2120 long waitTime = deadline - System.currentTimeMillis();
2121 if ((int)(c = ctl) != activePhase || w == null)
2122 stat = -1; // no longer ctl top
2123 else if (waitTime > TIMEOUT_SLOP)
2124 stat = 0; // spurious wakeup
2125 else if (!compareAndSetCtl(
2126 c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2127 (TC_MASK & (c - TC_UNIT)))))
2128 stat = -1; // lost race to signaller
2129 else {
2130 stat = 1;
2131 w.source = DROPPED;
2132 w.phase = activePhase;
2133 if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2134 vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2135 compareAndSetCtl( // try to wake up next waiter
2136 nc, ((UMASK & (nc + RC_UNIT)) |
2137 (nc & TC_MASK) | (v.stackPred & LMASK)))) {
2138 v.source = INVALID_ID; // enable cascaded timeouts
2139 v.phase = vp;
2140 U.unpark(v.owner);
2141 }
2142 }
2143 return stat;
2540 else
2541 return 0;
2542 }
2543
2544 /**
2545 * Gets and removes a local or stolen task for the given worker.
2546 *
2547 * @return a task, if available
2548 */
2549 final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2550 ForkJoinTask<?> t;
2551 if (w == null || (t = w.nextLocalTask()) == null)
2552 t = pollScan(false);
2553 return t;
2554 }
2555
2556 // External operations
2557
2558 /**
2559 * Finds and locks a WorkQueue for an external submitter, or
2560 * throws RejectedExecutionException if shutdown
2561 * @param rejectOnShutdown true if RejectedExecutionException
2562 * should be thrown when shutdown
2563 */
2564 final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2565 int r;
2566 if ((r = ThreadLocalRandom.getProbe()) == 0) {
2567 ThreadLocalRandom.localInit(); // initialize caller's probe
2568 r = ThreadLocalRandom.getProbe();
2569 }
2570 for (;;) {
2571 WorkQueue q; WorkQueue[] qs; int n, id, i;
2572 if ((qs = queues) == null || (n = qs.length) <= 0)
2573 break;
2574 if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2575 WorkQueue newq = new WorkQueue(null, id, 0, false);
2576 lockRunState();
2577 if (qs[i] == null && queues == qs)
2578 q = qs[i] = newq; // else lost race to install
2579 unlockRunState();
2580 }
2581 if (q != null && q.tryLockPhase()) {
2582 if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2583 q.unlockPhase(); // check while q lock held
2584 break;
2585 }
2586 return q;
2587 }
2588 r = ThreadLocalRandom.advanceProbe(r); // move
2589 }
2590 throw new RejectedExecutionException();
2591 }
2592
2593 private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2594 Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2595 if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2596 (wt = (ForkJoinWorkerThread)t).pool == this) {
2597 internal = true;
2598 q = wt.workQueue;
2599 }
2600 else { // find and lock queue
2601 internal = false;
2602 q = externalSubmissionQueue(true);
2603 }
2604 q.push(task, signalIfEmpty ? this : null, internal);
2605 return task;
2606 }
2607
2608 /**
2609 * Returns queue for an external thread, if one exists that has
2610 * possibly ever submitted to the given pool (nonzero probe), or
2611 * null if none.
2612 */
2613 static WorkQueue externalQueue(ForkJoinPool p) {
2614 WorkQueue[] qs; int n;
2615 int r = ThreadLocalRandom.getProbe();
2616 return (p != null && (qs = p.queues) != null &&
2617 (n = qs.length) > 0 && r != 0) ?
2618 qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2619 }
2620
2621 /**
2622 * Returns external queue for common pool.
2623 */
2624 static WorkQueue commonQueue() {
2625 return externalQueue(common);
2626 }
2627
|