< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

 543      * WorkQueue field "phase" encodes the queue array id in lower
 544      * bits, and otherwise acts similarly to the pool runState field:
 545      * The "IDLE" bit is clear while active (either a released worker
 546      * or a locked external queue), with other bits serving as a
 547      * version counter to distinguish changes across multiple reads.
 548      * Note that phase field updates lag queue CAS releases; seeing a
 549      * non-idle phase does not guarantee that the worker is available
 550      * (and so is never checked in this way).
 551      *
 552      * The ctl field also serves as the basis for memory
 553      * synchronization surrounding activation. This uses a more
 554      * efficient version of a Dekker-like rule that task producers and
 555      * consumers sync with each other by both writing/CASing ctl (even
 556      * if to its current value).  However, rather than CASing ctl to
 557      * its current value in the common case where no action is
 558      * required, we reduce write contention by ensuring that
 559      * signalWork invocations are prefaced with a fully fenced memory
 560      * access (which is usually needed anyway).
 561      *
 562      * Signalling. Signals (in signalWork) cause new or reactivated
 563      * workers to scan for tasks.  Method signalWork and its callers
 564      * try to approximate the unattainable goal of having the right
 565      * number of workers activated for the tasks at hand, but must err
 566      * on the side of too many workers vs too few to avoid stalls:
 567      *
 568      *  * If computations are purely tree structured, it suffices for
 569      *    every worker to activate another when it pushes a task into
 570      *    an empty queue, resulting in O(log(#threads)) steps to full
 571      *    activation. Emptiness must be conservatively approximated,
 572      *    which may result in unnecessary signals.  Also, to reduce
 573      *    resource usages in some cases, at the expense of slower
 574      *    startup in others, activation of an idle thread is preferred
 575      *    over creating a new one, here and elsewhere.
 576      *
 577      *  * At the other extreme, if "flat" tasks (those that do not in
 578      *    turn generate others) come in serially from only a single
 579      *    producer, each worker taking a task from a queue should
 580      *    propagate a signal if there are more tasks in that
 581      *    queue. This is equivalent to, but generally faster than,
 582      *    arranging the stealer take multiple tasks, re-pushing one or
 583      *    more on its own queue, and signalling (because its queue is
 584      *    empty), also resulting in logarithmic full activation
 585      *    time. If tasks do not not engage in unbounded loops based on
 586      *    the actions of other workers with unknown dependencies loop,
 587      *    this form of proagation can be limited to one signal per
 588      *    activation (phase change). We distinguish the cases by
 589      *    further signalling only if the task is an InterruptibleTask
 590      *    (see below), which are the only supported forms of task that
 591      *    may do so.
 592      *
 593      * * Because we don't know about usage patterns (or most commonly,
 594      *    mixtures), we use both approaches, which present even more
 595      *    opportunities to over-signal. (Failure to distinguish these
 596      *    cases in terms of submission methods was arguably an early
 597      *    design mistake.)  Note that in either of these contexts,
 598      *    signals may be (and often are) unnecessary because active
 599      *    workers continue scanning after running tasks without the
 600      *    need to be signalled (which is one reason work stealing is
 601      *    often faster than alternatives), so additional workers
 602      *    aren't needed.
 603      *
 604      * * For rapidly branching tasks that require full pool resources,
 605      *   oversignalling is OK, because signalWork will soon have no
 606      *   more workers to create or reactivate. But for others (mainly
 607      *   externally submitted tasks), overprovisioning may cause very
 608      *   noticeable slowdowns due to contention and resource
 609      *   wastage. We reduce impact by deactivating workers when
 610      *   queues don't have accessible tasks, but reactivating and
 611      *   rescanning if other tasks remain.
 612      *
 613      * * Despite these, signal contention and overhead effects still
 614      *   occur during ramp-up and ramp-down of small computations.
 615      *
 616      * Scanning. Method runWorker performs top-level scanning for (and
 617      * execution of) tasks by polling a pseudo-random permutation of
 618      * the array (by starting at a given index, and using a constant
 619      * cyclically exhaustive stride.)  It uses the same basic polling
 620      * method as WorkQueue.poll(), but restarts with a different
 621      * permutation on each invocation.  The pseudorandom generator
 622      * need not have high-quality statistical properties in the long
 623      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 624      * from ThreadLocalRandom probes, which are cheap and
 625      * suffice. Each queue's polling attempts to avoid becoming stuck
 626      * when other scanners/pollers stall.  Scans do not otherwise
 627      * explicitly take into account core affinities, loads, cache
 628      * localities, etc, However, they do exploit temporal locality
 629      * (which usually approximates these) by preferring to re-poll
 630      * from the same queue after a successful poll before trying
 631      * others, which also reduces bookkeeping, cache traffic, and
 632      * scanning overhead. But it also reduces fairness, which is
 633      * partially counteracted by giving up on detected interference
 634      * (which also reduces contention when too many workers try to
 635      * take small tasks from the same queue).
 636      *
 637      * Deactivation. When no tasks are found by a worker in runWorker,
 638      * it tries to deactivate()), giving up (and rescanning) on "ctl"
 639      * contention. To avoid missed signals during deactivation, the
 640      * method rescans and reactivates if there may have been a missed
 641      * signal during deactivation. To reduce false-alarm reactivations
 642      * while doing so, we scan multiple times (analogously to method
 643      * quiescent()) before trying to reactivate.  Because idle workers
 644      * are often not yet blocked (parked), we use a WorkQueue field to
 645      * advertise that a waiter actually needs unparking upon signal.



























 646      *
 647      * Quiescence. Workers scan looking for work, giving up when they
 648      * don't find any, without being sure that none are available.
 649      * However, some required functionality relies on consensus about
 650      * quiescence (also termination, discussed below). The count
 651      * fields in ctl allow accurate discovery of states in which all
 652      * workers are idle.  However, because external (asynchronous)
 653      * submitters are not part of this vote, these mechanisms
 654      * themselves do not guarantee that the pool is in a quiescent
 655      * state with respect to methods isQuiescent, shutdown (which
 656      * begins termination when quiescent), helpQuiesce, and indirectly
 657      * others including tryCompensate. Method quiescent() is used in
 658      * all of these contexts. It provides checks that all workers are
 659      * idle and there are no submissions that they could poll if they
 660      * were not idle, retrying on inconsistent reads of queues and
 661      * using the runState seqLock to retry on queue array updates.
 662      * (It also reports quiescence if the pool is terminating.) A true
 663      * report means only that there was a moment at which quiescence
 664      * held.  False negatives are inevitable (for example when queues
 665      * indices lag updates, as described above), which is accommodated

 875      * ====================
 876      *
 877      * Regular ForkJoinTasks manage task cancellation (method cancel)
 878      * independently from the interrupted status of threads running
 879      * tasks.  Interrupts are issued internally only while
 880      * terminating, to wake up workers and cancel queued tasks.  By
 881      * default, interrupts are cleared only when necessary to ensure
 882      * that calls to LockSupport.park do not loop indefinitely (park
 883      * returns immediately if the current thread is interrupted).
 884      *
 885      * To comply with ExecutorService specs, we use subclasses of
 886      * abstract class InterruptibleTask for tasks that require
 887      * stronger interruption and cancellation guarantees.  External
 888      * submitters never run these tasks, even if in the common pool
 889      * (as indicated by ForkJoinTask.noUserHelp status bit).
 890      * InterruptibleTasks include a "runner" field (implemented
 891      * similarly to FutureTask) to support cancel(true).  Upon pool
 892      * shutdown, runners are interrupted so they can cancel. Since
 893      * external joining callers never run these tasks, they must await
 894      * cancellation by others, which can occur along several different
 895      * paths. The inability to rely on caller-runs may also require
 896      * extra signalling (resulting in scanning and contention) so is
 897      * done only conditionally in methods push and runworker.
 898      *
 899      * Across these APIs, rules for reporting exceptions for tasks
 900      * with results accessed via join() differ from those via get(),
 901      * which differ from those invoked using pool submit methods by
 902      * non-workers (which comply with Future.get() specs). Internal
 903      * usages of ForkJoinTasks ignore interrupted status when executing
 904      * or awaiting completion.  Otherwise, reporting task results or
 905      * exceptions is preferred to throwing InterruptedExceptions,
 906      * which are in turn preferred to timeouts. Similarly, completion
 907      * status is preferred to reporting cancellation.  Cancellation is
 908      * reported as an unchecked exception by join(), and by worker
 909      * calls to get(), but is otherwise wrapped in a (checked)
 910      * ExecutionException.
 911      *
 912      * Worker Threads cannot be VirtualThreads, as enforced by
 913      * requiring ForkJoinWorkerThreads in factories.  There are
 914      * several constructions relying on this.  However as of this
 915      * writing, virtual thread bodies are by default run as some form
 916      * of InterruptibleTask.
 917      *

 944      * the placement of their fields. Caches misses and contention due
 945      * to false-sharing have been observed to slow down some programs
 946      * by more than a factor of four. Effects may vary across initial
 947      * memory configuarations, applications, and different garbage
 948      * collectors and GC settings, so there is no perfect solution.
 949      * Too much isolation may generate more cache misses in common
 950      * cases (because some fields snd slots are usually read at the
 951      * same time). The @Contended annotation provides only rough
 952      * control (for good reason). Similarly for relying on fields
 953      * being placed in size-sorted declaration order.
 954      *
 955      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 956      * most false-sharing misses with respect to other fields. Also,
 957      * ForkJoinPool fields are ordered such that fields less prone to
 958      * contention effects are first, offsetting those that otherwise
 959      * would be, while also reducing total footprint vs using
 960      * multiple @Contended regions, which tends to slow down
 961      * less-contended applications. To help arrange this, some
 962      * non-reference fields are declared as "long" even when ints or
 963      * shorts would suffice.  For class WorkQueue, an
 964      * embedded @Contended region segregates fields most heavily
 965      * updated by owners from those most commonly read by stealers or
 966      * other management.




 967      *
 968      * Initial sizing and resizing of WorkQueue arrays is an even more
 969      * delicate tradeoff because the best strategy systematically
 970      * varies across garbage collectors. Small arrays are better for
 971      * locality and reduce GC scan time, but large arrays reduce both
 972      * direct false-sharing and indirect cases due to GC bookkeeping
 973      * (cardmarks etc), and reduce the number of resizes, which are
 974      * not especially fast because they require atomic transfers.
 975      * Currently, arrays for workers are initialized to be just large
 976      * enough to avoid resizing in most tree-structured tasks, but
 977      * larger for external queues where both false-sharing problems
 978      * and the need for resizing are more common. (Maintenance note:
 979      * any changes in fields, queues, or their uses, or JVM layout
 980      * policies, must be accompanied by re-evaluation of these
 981      * placement and sizing decisions.)
 982      *
 983      * Style notes
 984      * ===========
 985      *
 986      * Memory ordering relies mainly on atomic operations (CAS,
 987      * getAndSet, getAndAdd) along with moded accesses. These use
 988      * jdk-internal Unsafe for atomics and special memory modes,
 989      * rather than VarHandles, to avoid initialization dependencies in
 990      * other jdk components that require early parallelism.  This can
 991      * be awkward and ugly, but also reflects the need to control
 992      * outcomes across the unusual cases that arise in very racy code
 993      * with very few invariants. All atomic task slot updates use
 994      * Unsafe operations requiring offset positions, not indices, as
 995      * computed by method slotOffset. All fields are read into locals
 996      * before use, and null-checked if they are references, even if
 997      * they can never be null under current usages. Usually,
 998      * computations (held in local variables) are defined as soon as
 999      * logically enabled, sometimes to convince compilers that they
1000      * may be performed despite memory ordering constraints.  Array
1001      * accesses using masked indices include checks (that are always

1044      */
1045     static final long DEFAULT_KEEPALIVE = 60_000L;
1046 
1047     /**
1048      * Undershoot tolerance for idle timeouts, also serving as the
1049      * minimum allowed timeout value.
1050      */
1051     static final long TIMEOUT_SLOP = 20L;
1052 
1053     /**
1054      * The default value for common pool maxSpares.  Overridable using
1055      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1056      * system property.  The default value is far in excess of normal
1057      * requirements, but also far short of maximum capacity and typical OS
1058      * thread limits, so allows JVMs to catch misuse/abuse before
1059      * running out of resources needed to do so.
1060      */
1061     static final int DEFAULT_COMMON_MAX_SPARES = 256;
1062 
1063     /**
1064      * Initial capacity of work-stealing queue array for workers.
1065      * Must be a power of two, at least 2. See above.
1066      */
1067     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1068 
1069     /**
1070      * Initial capacity of work-stealing queue array for external queues.
1071      * Must be a power of two, at least 2. See above.
1072      */
1073     static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
1074 
1075     // conversions among short, int, long
1076     static final int  SMASK           = 0xffff;      // (unsigned) short bits
1077     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
1078     static final long UMASK           = ~LMASK;      // upper 32 bits
1079 
1080     // masks and sentinels for queue indices
1081     static final int MAX_CAP          = 0x7fff;   // max # workers
1082     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1083     static final int INVALID_ID       = 0x4000;   // unused external queue id
1084 
1085     // pool.runState bits
1086     static final long STOP            = 1L <<  0;   // terminating
1087     static final long SHUTDOWN        = 1L <<  1;   // terminate when quiescent
1088     static final long CLEANED         = 1L <<  2;   // stopped and queues cleared
1089     static final long TERMINATED      = 1L <<  3;   // only set if STOP also set
1090     static final long RS_LOCK         = 1L <<  4;   // lowest seqlock bit
1091 
1092     // spin/sleep limits for runState locking and elsewhere
1093     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1094     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos

1186     static final class DefaultForkJoinWorkerThreadFactory
1187         implements ForkJoinWorkerThreadFactory {
1188         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1189             return ((pool.workerNamePrefix == null) ? // is commonPool
1190                     new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1191                     new ForkJoinWorkerThread(null, pool, true, false));
1192         }
1193     }
1194 
1195     /**
1196      * Queues supporting work-stealing as well as external task
1197      * submission. See above for descriptions and algorithms.
1198      */
1199     static final class WorkQueue {
1200         // fields declared in order of their likely layout on most VMs
1201         final ForkJoinWorkerThread owner; // null if shared
1202         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
1203         int base;                  // index of next slot for poll
1204         final int config;          // mode bits
1205 
1206         // fields otherwise causing more unnecessary false-sharing cache misses
1207         @jdk.internal.vm.annotation.Contended("w")
1208         int top;                   // index of next slot for push


1209         @jdk.internal.vm.annotation.Contended("w")
1210         volatile int phase;        // versioned active status
1211         @jdk.internal.vm.annotation.Contended("w")
1212         int stackPred;             // pool stack (ctl) predecessor link
1213         @jdk.internal.vm.annotation.Contended("w")


1214         volatile int source;       // source queue id (or DROPPED)
1215         @jdk.internal.vm.annotation.Contended("w")
1216         int nsteals;               // number of steals from other queues
1217         @jdk.internal.vm.annotation.Contended("w")
1218         volatile int parking;      // nonzero if parked in awaitWork
1219 
1220         // Support for atomic operations
1221         private static final Unsafe U;
1222         private static final long PHASE;
1223         private static final long BASE;
1224         private static final long TOP;
1225         private static final long ARRAY;
1226 
1227         final void updateBase(int v) {
1228             U.putIntVolatile(this, BASE, v);
1229         }
1230         final void updateTop(int v) {
1231             U.putIntOpaque(this, TOP, v);
1232         }
1233         final void updateArray(ForkJoinTask<?>[] a) {
1234             U.getAndSetReference(this, ARRAY, a);
1235         }
1236         final void unlockPhase() {
1237             U.getAndAddInt(this, PHASE, IDLE);
1238         }
1239         final boolean tryLockPhase() {    // seqlock acquire
1240             int p;
1241             return (((p = phase) & IDLE) != 0 &&
1242                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1243         }
1244 
1245         /**
1246          * Constructor. For internal queues, most fields are initialized
1247          * upon thread start in pool.registerWorker.
1248          */
1249         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1250                   boolean clearThreadLocals) {
1251             array = new ForkJoinTask<?>[owner == null ?
1252                                         INITIAL_EXTERNAL_QUEUE_CAPACITY :
1253                                         INITIAL_QUEUE_CAPACITY];
1254             this.owner = owner;
1255             this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;




1256         }
1257 
1258         /**
1259          * Returns an exportable index (used by ForkJoinWorkerThread).
1260          */
1261         final int getPoolIndex() {
1262             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1263         }
1264 
1265         /**
1266          * Returns the approximate number of tasks in the queue.
1267          */
1268         final int queueSize() {
1269             int unused = phase;             // for ordering effect
1270             return Math.max(top - base, 0); // ignore transient negative
1271         }
1272 
1273         /**
1274          * Pushes a task. Called only by owner or if already locked
1275          *
1276          * @param task the task; no-op if null
1277          * @param pool the pool to signal if was previously empty, else null
1278          * @param internal if caller owns this queue
1279          * @throws RejectedExecutionException if array could not be resized
1280          */
1281         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1282             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1283             if ((a = array) != null && (cap = a.length) > 0 && // else disabled
1284                 task != null) {
1285                 int pk = task.noUserHelp() + 1;             // prev slot offset
1286                 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1287                     top = s + 1;
1288                     long pos = slotOffset(m & s);
1289                     if (!internal)
1290                         U.putReference(a, pos, task);       // inside lock
1291                     else
1292                         U.getAndSetReference(a, pos, task); // fully fenced
1293                     if (room == 0)                          // resize
1294                         growArray(a, cap, s);
1295                 }
1296                 if (!internal)
1297                     unlockPhase();
1298                 if (room < 0)
1299                     throw new RejectedExecutionException("Queue capacity exceeded");
1300                 if ((room == 0 || a[m & (s - pk)] == null) &&
1301                     pool != null)
1302                     pool.signalWork();   // may have appeared empty

1303             }
1304         }
1305 
1306         /**
1307          * Resizes the queue array unless out of memory.
1308          * @param a old array
1309          * @param cap old array capacity
1310          * @param s current top

1311          */
1312         private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
1313             int newCap = cap << 1;

1314             if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1315                 ForkJoinTask<?>[] newArray = null;
1316                 try {
1317                     newArray = new ForkJoinTask<?>[newCap];
1318                 } catch (OutOfMemoryError ex) {
1319                 }
1320                 if (newArray != null) {               // else throw on next push
1321                     int mask = cap - 1, newMask = newCap - 1;
1322                     for (int k = s, j = cap; j > 0; --j, --k) {
1323                         ForkJoinTask<?> u;            // poll old, push to new
1324                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1325                                  a, slotOffset(k & mask), null)) == null)
1326                             break;                    // lost to pollers
1327                         newArray[k & newMask] = u;
1328                     }
1329                     updateArray(newArray);           // fully fenced
1330                 }
1331             }

1332         }
1333 
1334         /**
1335          * Takes next task, if one exists, in order specified by mode,
1336          * so acts as either local-pop or local-poll. Called only by owner.
1337          * @param fifo nonzero if FIFO mode
1338          */
1339         private ForkJoinTask<?> nextLocalTask(int fifo) {
1340             ForkJoinTask<?> t = null;
1341             ForkJoinTask<?>[] a = array;
1342             int b = base, p = top, cap;
1343             if (p - b > 0 && a != null && (cap = a.length) > 0) {
1344                 for (int m = cap - 1, s, nb;;) {
1345                     if (fifo == 0 || (nb = b + 1) == p) {
1346                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1347                                  a, slotOffset(m & (s = p - 1)), null)) != null)
1348                             updateTop(s);       // else lost race for only task
1349                         break;














1350                     }
1351                     if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1352                              a, slotOffset(m & b), null)) != null) {
1353                         updateBase(nb);
1354                         break;
1355                     }
1356                     while (b == (b = U.getIntAcquire(this, BASE)))
1357                         Thread.onSpinWait();    // spin to reduce memory traffic
1358                     if (p - b <= 0)
1359                         break;
1360                 }
1361             }
1362             return t;
1363         }
1364 
1365         /**
1366          * Takes next task, if one exists, using configured mode.
1367          * (Always internal, never called for Common pool.)
1368          */
1369         final ForkJoinTask<?> nextLocalTask() {
1370             return nextLocalTask(config & FIFO);
1371         }
1372 
1373         /**
1374          * Pops the given task only if it is at the current top.
1375          * @param task the task. Caller must ensure non-null.
1376          * @param internal if caller owns this queue
1377          */
1378         final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1379             boolean taken = false;
1380             ForkJoinTask<?>[] a = array;
1381             int p = top, s = p - 1, cap; long k;
1382             if (a != null && (cap = a.length) > 0 &&
1383                 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1384                 (internal || tryLockPhase())) {
1385                 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1386                     taken = true;
1387                     updateTop(s);
1388                 }
1389                 if (!internal)
1390                     unlockPhase();

1426                     a, slotOffset((cap - 1) & (nb = b + 1)));
1427                 if (base != b)                     // inconsistent
1428                     ;
1429                 else if (t == null) {
1430                     if (u == null && top - b <= 0)
1431                         break;                     // empty
1432                     if (pb == b)
1433                         Thread.onSpinWait();       // stalled
1434                 }
1435                 else if (U.compareAndSetReference(a, k, t, null)) {
1436                     updateBase(nb);
1437                     return t;
1438                 }
1439             }
1440             return null;
1441         }
1442 
1443         // specialized execution methods
1444 
1445         /**
1446          * Runs the given task, as well as remaining local tasks.

1447          */
1448         final void topLevelExec(ForkJoinTask<?> task, int fifo) {
1449             while (task != null) {
1450                 task.doExec();
1451                 task = nextLocalTask(fifo);














































1452             }
1453         }
1454 
1455         /**
1456          * Deep form of tryUnpush: Traverses from top and removes and
1457          * runs task if present.
1458          */
1459         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1460             ForkJoinTask<?>[] a = array;
1461             int b = base, p = top, s = p - 1, d = p - b, cap;
1462             if (a != null && (cap = a.length) > 0) {
1463                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1464                     long k; boolean taken;
1465                     ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1466                         a, k = slotOffset(i & m));
1467                     if (t == null)
1468                         break;
1469                     if (t == task) {
1470                         if (!internal && !tryLockPhase())
1471                             break;                  // fail if locked

1561                 }
1562                 else if (!(t instanceof CompletableFuture
1563                            .AsynchronousCompletionTask))
1564                     break;
1565                 if (blocker != null && blocker.isReleasable())
1566                     break;
1567                 if (base == b && t != null &&
1568                     U.compareAndSetReference(a, k, t, null)) {
1569                     updateBase(b + 1);
1570                     t.doExec();
1571                 }
1572             }
1573         }
1574 
1575         // misc
1576 
1577         /**
1578          * Cancels all local tasks. Called only by owner.
1579          */
1580         final void cancelTasks() {
1581             for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) {
1582                 try {
1583                     t.cancel(false);
1584                 } catch (Throwable ignore) {
1585                 }
1586             }
1587         }
1588 
1589         /**
1590          * Returns true if internal and not known to be blocked.
1591          */
1592         final boolean isApparentlyUnblocked() {
1593             Thread wt; Thread.State s;
1594             return ((wt = owner) != null && (phase & IDLE) != 0 &&
1595                     (s = wt.getState()) != Thread.State.BLOCKED &&
1596                     s != Thread.State.WAITING &&
1597                     s != Thread.State.TIMED_WAITING);
1598         }
1599 
1600         static {
1601             U = Unsafe.getUnsafe();

1763         return false;
1764     }
1765 
1766     /**
1767      * Provides a name for ForkJoinWorkerThread constructor.
1768      */
1769     final String nextWorkerThreadName() {
1770         String prefix = workerNamePrefix;
1771         long tid = incrementThreadIds() + 1L;
1772         if (prefix == null) // commonPool has no prefix
1773             prefix = "ForkJoinPool.commonPool-worker-";
1774         return prefix.concat(Long.toString(tid));
1775     }
1776 
1777     /**
1778      * Finishes initializing and records internal queue.
1779      *
1780      * @param w caller's WorkQueue
1781      */
1782     final void registerWorker(WorkQueue w) {
1783         if (w != null && (runState & STOP) == 0L) {

1784             ThreadLocalRandom.localInit();
1785             int seed = w.stackPred = ThreadLocalRandom.getProbe();
1786             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1787             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1788             long stop = lockRunState() & STOP;
1789             try {
1790                 WorkQueue[] qs; int n;
1791                 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1792                     for (int k = n, m = n - 1;  ; id += 2) {
1793                         if (qs[id &= m] == null)
1794                             break;
1795                         if ((k -= 2) <= 0) {
1796                             id |= n;
1797                             break;
1798                         }
1799                     }
1800                     w.phase = id | phaseSeq;    // now publishable
1801                     if (id < n)
1802                         qs[id] = w;
1803                     else {                      // expand

1841             do {} while (c != (c = compareAndExchangeCtl(
1842                                    c, ((RC_MASK & (c - RC_UNIT)) |
1843                                        (TC_MASK & (c - TC_UNIT)) |
1844                                        (LMASK & c)))));
1845         }
1846         if (phase != 0 && w != null) {     // remove index unless terminating
1847             long ns = w.nsteals & 0xffffffffL;
1848             if ((runState & STOP) == 0L) {
1849                 WorkQueue[] qs; int n, i;
1850                 if ((lockRunState() & STOP) == 0L &&
1851                     (qs = queues) != null && (n = qs.length) > 0 &&
1852                     qs[i = phase & SMASK & (n - 1)] == w) {
1853                     qs[i] = null;
1854                     stealCount += ns;      // accumulate steals
1855                 }
1856                 unlockRunState();
1857             }
1858         }
1859         if ((tryTerminate(false, false) & STOP) == 0L &&
1860             phase != 0 && w != null && w.source != DROPPED) {
1861             signalWork();                  // possibly replace
1862             w.cancelTasks();               // clean queue

1863         }
1864         if (ex != null)
1865             ForkJoinTask.rethrow(ex);
1866     }
1867 
1868     /**
1869      * Releases an idle worker, or creates one if not enough exist.

1870      */
1871     final void signalWork() {
1872         int pc = parallelism;
1873         for (long c = ctl;;) {
1874             WorkQueue[] qs = queues;
1875             long ac = (c + RC_UNIT) & RC_MASK, nc;
1876             int sp = (int)c, i = sp & SMASK;
1877             if ((short)(c >>> RC_SHIFT) >= pc)
1878                 break;
1879             if (qs == null)
1880                 break;
1881             if (qs.length <= i)
1882                 break;
1883             WorkQueue w = qs[i], v = null;
1884             if (sp == 0) {
1885                 if ((short)(c >>> TC_SHIFT) >= pc)
1886                     break;
1887                 nc = ((c + TC_UNIT) & TC_MASK);
1888             }
1889             else if ((v = w) == null)
1890                 break;
1891             else
1892                 nc = (v.stackPred & LMASK) | (c & TC_MASK);
1893             if (c == (c = compareAndExchangeCtl(c, nc | ac))) {


1894                 if (v == null)
1895                     createWorker();
1896                 else {
1897                     v.phase = sp;
1898                     if (v.parking != 0)
1899                         U.unpark(v.owner);
1900                 }
1901                 break;
1902             }
1903         }
1904     }
1905 
1906     /**
1907      * Releases all waiting workers. Called only during shutdown.
1908      */
1909     private void releaseWaiters() {
1910         for (long c = ctl;;) {
1911             WorkQueue[] qs; WorkQueue v; int sp, i;
1912             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1913                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1957                 else if ((e & SHUTDOWN) == 0)
1958                     return 0;
1959                 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1960                     return 0;
1961                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1962                     return 1;                             // enable termination
1963                 else
1964                     break;                                // restart
1965             }
1966         }
1967     }
1968 
1969     /**
1970      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1971      * See above for explanation.
1972      *
1973      * @param w caller's WorkQueue (may be null on failed initialization)
1974      */
1975     final void runWorker(WorkQueue w) {
1976         if (w != null) {
1977             int phase = w.phase, r = w.stackPred;     // seed from registerWorker
1978             int fifo = w.config & FIFO, nsteals = 0, src = -1;
1979             for (;;) {
1980                 WorkQueue[] qs;


1981                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1982                 if ((runState & STOP) != 0L || (qs = queues) == null)
1983                     break;
1984                 int n = qs.length, i = r, step = (r >>> 16) | 1;
1985                 boolean rescan = false;
1986                 scan: for (int l = n; l > 0; --l, i += step) {  // scan queues
1987                     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1988                     if ((q = qs[j = i & (n - 1)]) != null &&
1989                         (a = q.array) != null && (cap = a.length) > 0) {
1990                         for (int m = cap - 1, pb = -1, b = q.base;;) {
1991                             ForkJoinTask<?> t; long k;
1992                             t = (ForkJoinTask<?>)U.getReferenceAcquire(
1993                                 a, k = slotOffset(m & b));
1994                             if (b != (b = q.base) || t == null ||
1995                                 !U.compareAndSetReference(a, k, t, null)) {
1996                                 if (a[b & m] == null) {
1997                                     if (rescan)           // end of run
1998                                         break scan;
1999                                     if (a[(b + 1) & m] == null &&
2000                                         a[(b + 2) & m] == null) {
2001                                         break;            // probably empty
2002                                     }
2003                                     if (pb == (pb = b)) { // track progress
2004                                         rescan = true;    // stalled; reorder scan
2005                                         break scan;
2006                                     }
2007                                 }



2008                             }
2009                             else {
2010                                 boolean propagate;
2011                                 int nb = q.base = b + 1, prevSrc = src;
2012                                 w.nsteals = ++nsteals;
2013                                 w.source = src = j;       // volatile
2014                                 rescan = true;
2015                                 int nh = t.noUserHelp();
2016                                 if (propagate =
2017                                     (prevSrc != src || nh != 0) && a[nb & m] != null)
2018                                     signalWork();
2019                                 w.topLevelExec(t, fifo);
2020                                 if ((b = q.base) != nb && !propagate)
2021                                     break scan;          // reduce interference


2022                             }
2023                         }
2024                     }
2025                 }
2026                 if (!rescan) {
2027                     if (((phase = deactivate(w, phase)) & IDLE) != 0)
2028                         break;
2029                     src = -1;                            // re-enable propagation
2030                 }


2031             }
2032         }
2033     }
2034 
2035     /**
2036      * Deactivates and if necessary awaits signal or termination.

2037      *
2038      * @param w the worker
2039      * @param phase current phase
2040      * @return current phase, with IDLE set if worker should exit
2041      */
2042     private int deactivate(WorkQueue w, int phase) {
2043         if (w == null)                        // currently impossible
2044             return IDLE;
2045         int p = phase | IDLE, activePhase = phase + (IDLE << 1);
2046         long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
2047         int sp = w.stackPred = (int)pc;       // set ctl stack link
2048         w.phase = p;
2049         if (!compareAndSetCtl(pc, qc))        // try to enqueue
2050             return w.phase = phase;           // back out on possible signal
2051         int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
2052         if (((e = runState) & STOP) != 0L ||
2053             ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
2054             (qs = queues) == null || (n = qs.length) <= 0)
2055             return IDLE;                      // terminating
2056 
2057         for (int prechecks = Math.min(ac, 2), // reactivation threshold
2058              k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
2059             WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
2060             if (w.phase == activePhase)
2061                 return activePhase;
2062             if (--k < 0)
2063                 return awaitWork(w, p);       // block, drop, or exit
2064             if ((q = qs[k & (n - 1)]) == null)
2065                 Thread.onSpinWait();
2066             else if ((a = q.array) != null && (cap = a.length) > 0 &&
2067                      a[q.base & (cap - 1)] != null && --prechecks < 0 &&
2068                      (int)(c = ctl) == activePhase &&
2069                      compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
2070                 return w.phase = activePhase; // reactivate
2071         }

















2072     }
2073 
2074     /**
2075      * Awaits signal or termination.
2076      *
2077      * @param w the work queue
2078      * @param p current phase (known to be idle)
2079      * @return current phase, with IDLE set if worker should exit
2080      */
2081     private int awaitWork(WorkQueue w, int p) {
2082         if (w != null) {
2083             ForkJoinWorkerThread t; long deadline;
2084             if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
2085                 t.resetThreadLocals();          // clear before reactivate
2086             if ((ctl & RC_MASK) > 0L)
2087                 deadline = 0L;
2088             else if ((deadline =
2089                       (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
2090                       System.currentTimeMillis()) == 0L)
2091                 deadline = 1L;                 // avoid zero
2092             int activePhase = p + IDLE;
2093             if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
2094                 LockSupport.setCurrentBlocker(this);
2095                 w.parking = 1;                 // enable unpark
2096                 while ((p = w.phase) != activePhase) {
2097                     boolean trimmable = false; int trim;
2098                     Thread.interrupted();      // clear status
2099                     if ((runState & STOP) != 0L)
2100                         break;
2101                     if (deadline != 0L) {
2102                         if ((trim = tryTrim(w, p, deadline)) > 0)
2103                             break;
2104                         else if (trim < 0)
2105                             deadline = 0L;
2106                         else
2107                             trimmable = true;
2108                     }
2109                     U.park(trimmable, deadline);

2110                 }
2111                 w.parking = 0;
2112                 LockSupport.setCurrentBlocker(null);




2113             }

2114         }
2115         return p;
2116     }
2117 
2118     /**
2119      * Tries to remove and deregister worker after timeout, and release
2120      * another to do the same.
2121      * @return > 0: trimmed, < 0 : not trimmable, else 0
2122      */
2123     private int tryTrim(WorkQueue w, int phase, long deadline) {
2124         long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2125         if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
2126             stat = -1;                      // no longer ctl top
2127         else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2128             stat = 0;                       // spurious wakeup
2129         else if (!compareAndSetCtl(
2130                      c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2131                                (TC_MASK & (c - TC_UNIT)))))
2132             stat = -1;                      // lost race to signaller
2133         else {
2134             stat = 1;
2135             w.source = DROPPED;
2136             w.phase = activePhase;
2137             if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2138                 vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2139                 compareAndSetCtl(           // try to wake up next waiter
2140                     nc, ((UMASK & (nc + RC_UNIT)) |
2141                          (nc & TC_MASK) | (v.stackPred & LMASK)))) {
2142                 v.source = INVALID_ID;      // enable cascaded timeouts
2143                 v.phase = vp;
2144                 U.unpark(v.owner);
2145             }
2146         }
2147         return stat;
2148     }
2149 
2150     /**
2151      * Scans for and returns a polled task, if available.  Used only
2152      * for untracked polls. Begins scan at a random index to avoid
2153      * systematic unfairness.
2154      *
2155      * @param submissionsOnly if true, only scan submission queues
2156      */
2157     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
2158         if ((runState & STOP) == 0L) {
2159             WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
2160             int r = ThreadLocalRandom.nextSecondarySeed();
2161             if (submissionsOnly)                 // even indices only
2162                 r &= ~1;
2163             int step = (submissionsOnly) ? 2 : 1;
2164             if ((qs = queues) != null && (n = qs.length) > 0) {
2165                 for (int i = n; i > 0; i -= step, r += step) {
2166                     if ((q = qs[r & (n - 1)]) != null &&
2167                         (t = q.poll()) != null)

2544         else
2545             return 0;
2546     }
2547 
2548     /**
2549      * Gets and removes a local or stolen task for the given worker.
2550      *
2551      * @return a task, if available
2552      */
2553     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2554         ForkJoinTask<?> t;
2555         if (w == null || (t = w.nextLocalTask()) == null)
2556             t = pollScan(false);
2557         return t;
2558     }
2559 
2560     // External operations
2561 
2562     /**
2563      * Finds and locks a WorkQueue for an external submitter, or
2564      * throws RejectedExecutionException if shutdown or terminating.
2565      * @param r current ThreadLocalRandom.getProbe() value
2566      * @param rejectOnShutdown true if RejectedExecutionException
2567      *        should be thrown when shutdown (else only if terminating)
2568      */
2569     private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
2570         int reuse;                                   // nonzero if prefer create
2571         if ((reuse = r) == 0) {
2572             ThreadLocalRandom.localInit();           // initialize caller's probe
2573             r = ThreadLocalRandom.getProbe();
2574         }
2575         for (int probes = 0; ; ++probes) {
2576             int n, i, id; WorkQueue[] qs; WorkQueue q;
2577             if ((qs = queues) == null)
2578                 break;
2579             if ((n = qs.length) <= 0)
2580                 break;
2581             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2582                 WorkQueue w = new WorkQueue(null, id, 0, false);
2583                 w.phase = id;
2584                 boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
2585                                   rejectOnShutdown);
2586                 if (!reject && queues == qs && qs[i] == null)
2587                     q = qs[i] = w;                   // else lost race to install
2588                 unlockRunState();
2589                 if (q != null)
2590                     return q;
2591                 if (reject)
2592                     break;
2593                 reuse = 0;
2594             }
2595             if (reuse == 0 || !q.tryLockPhase()) {   // move index
2596                 if (reuse == 0) {
2597                     if (probes >= n >> 1)
2598                         reuse = r;                   // stop prefering free slot
2599                 }
2600                 else if (q != null)
2601                     reuse = 0;                       // probe on collision
2602                 r = ThreadLocalRandom.advanceProbe(r);
2603             }
2604             else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2605                 q.unlockPhase();                     // check while q lock held
2606                 break;
2607             }
2608             else
2609                 return q;


2610         }
2611         throw new RejectedExecutionException();
2612     }
2613 
2614     private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2615         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2616         if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2617             (wt = (ForkJoinWorkerThread)t).pool == this) {
2618             internal = true;
2619             q = wt.workQueue;
2620         }
2621         else {                     // find and lock queue
2622             internal = false;
2623             q = submissionQueue(ThreadLocalRandom.getProbe(), true);
2624         }
2625         q.push(task, signalIfEmpty ? this : null, internal);
2626         return task;
2627     }
2628 
2629     /**
2630      * Returns queue for an external submission, bypassing call to
2631      * submissionQueue if already established and unlocked.
2632      */
2633     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2634         WorkQueue[] qs; WorkQueue q; int n;
2635         int r = ThreadLocalRandom.getProbe();
2636         return (((qs = queues) != null && (n = qs.length) > 0 &&
2637                  (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
2638                  q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
2639     }
2640 
2641     /**
2642      * Returns queue for an external thread, if one exists that has
2643      * possibly ever submitted to the given pool (nonzero probe), or
2644      * null if none.
2645      */
2646     static WorkQueue externalQueue(ForkJoinPool p) {
2647         WorkQueue[] qs; int n;
2648         int r = ThreadLocalRandom.getProbe();
2649         return (p != null && (qs = p.queues) != null &&
2650                 (n = qs.length) > 0 && r != 0) ?
2651             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2652     }
2653 
2654     /**
2655      * Returns external queue for common pool.
2656      */
2657     static WorkQueue commonQueue() {
2658         return externalQueue(common);
2659     }
2660 

 543      * WorkQueue field "phase" encodes the queue array id in lower
 544      * bits, and otherwise acts similarly to the pool runState field:
 545      * The "IDLE" bit is clear while active (either a released worker
 546      * or a locked external queue), with other bits serving as a
 547      * version counter to distinguish changes across multiple reads.
 548      * Note that phase field updates lag queue CAS releases; seeing a
 549      * non-idle phase does not guarantee that the worker is available
 550      * (and so is never checked in this way).
 551      *
 552      * The ctl field also serves as the basis for memory
 553      * synchronization surrounding activation. This uses a more
 554      * efficient version of a Dekker-like rule that task producers and
 555      * consumers sync with each other by both writing/CASing ctl (even
 556      * if to its current value).  However, rather than CASing ctl to
 557      * its current value in the common case where no action is
 558      * required, we reduce write contention by ensuring that
 559      * signalWork invocations are prefaced with a fully fenced memory
 560      * access (which is usually needed anyway).
 561      *
 562      * Signalling. Signals (in signalWork) cause new or reactivated
 563      * workers to scan for tasks.  SignalWork is invoked in two cases:
 564      * (1) When a task is pushed onto an empty queue, and (2) When a
 565      * worker takes a top-level task from a queue that has additional
 566      * tasks. Together, these suffice in O(log(#threads)) steps to
 567      * fully activate with at least enough workers, and ideally no
 568      * more than required.  This ideal is unobtainable: Callers do not
 569      * know whether another worker will finish its current task and
 570      * poll for others without need of a signal (which is otherwise an
 571      * advantage of work-stealing vs other schemes), and also must
 572      * conservatively estimate the triggering conditions of emptiness
 573      * or non-emptiness; all of which usually cause more activations
 574      * than necessary (see below). (Method signalWork is also used as
 575      * failsafe in case of Thread failures in deregisterWorker, to
 576      * activate or create a new worker to replace them).
 577      *
 578      * Top-Level scheduling
 579      * ====================



































 580      *
 581      * Scanning. Method runWorker performs top-level scanning for (and
 582      * execution of) tasks by polling a pseudo-random permutation of
 583      * the array (by starting at a given index, and using a constant
 584      * cyclically exhaustive stride.)  It uses the same basic polling
 585      * method as WorkQueue.poll(), but restarts with a different
 586      * permutation on each rescan.  The pseudorandom generator need
 587      * not have high-quality statistical properties in the long
 588      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 589      * from ThreadLocalRandom probes, which are cheap and suffice.











 590      *
 591      * Deactivation. When no tasks are found by a worker in runWorker,
 592      * it invokes deactivate, that first deactivates (to an IDLE
 593      * phase).  Avoiding missed signals during deactivation requires a
 594      * (conservative) rescan, reactivating if there may be tasks to
 595      * poll. Because idle workers are often not yet blocked (parked),
 596      * we use a WorkQueue field to advertise that a waiter actually
 597      * needs unparking upon signal.
 598      *
 599      * When tasks are constructed as (recursive) DAGs, top-level
 600      * scanning is usually infrequent, and doesn't encounter most
 601      * of the following problems addressed by runWorker and awaitWork:
 602      *
 603      * Locality. Polls are organized into "runs", continuing until
 604      * empty or contended, while also minimizing interference by
 605      * postponing bookeeping to ends of runs. This may reduce
 606      * fairness.
 607      *
 608      * Contention. When many workers try to poll few queues, they
 609      * often collide, generating CAS failures and disrupting locality
 610      * of workers already running their tasks. This also leads to
 611      * stalls when tasks cannot be taken because other workers have
 612      * not finished poll operations, which is detected by reading
 613      * ahead in queue arrays. In both cases, workers restart scans in a
 614      * way that approximates randomized backoff.
 615      *
 616      * Oversignalling. When many short top-level tasks are present in
 617      * a small number of queues, the above signalling strategy may
 618      * activate many more workers than needed, worsening locality and
 619      * contention problems, while also generating more global
 620      * contention (field ctl is CASed on every activation and
 621      * deactivation). We filter out (both in runWorker and
 622      * signalWork) attempted signals that are surely not needed
 623      * because the signalled tasks are already taken.
 624      *
 625      * Shutdown and Quiescence
 626      * =======================
 627      *
 628      * Quiescence. Workers scan looking for work, giving up when they
 629      * don't find any, without being sure that none are available.
 630      * However, some required functionality relies on consensus about
 631      * quiescence (also termination, discussed below). The count
 632      * fields in ctl allow accurate discovery of states in which all
 633      * workers are idle.  However, because external (asynchronous)
 634      * submitters are not part of this vote, these mechanisms
 635      * themselves do not guarantee that the pool is in a quiescent
 636      * state with respect to methods isQuiescent, shutdown (which
 637      * begins termination when quiescent), helpQuiesce, and indirectly
 638      * others including tryCompensate. Method quiescent() is used in
 639      * all of these contexts. It provides checks that all workers are
 640      * idle and there are no submissions that they could poll if they
 641      * were not idle, retrying on inconsistent reads of queues and
 642      * using the runState seqLock to retry on queue array updates.
 643      * (It also reports quiescence if the pool is terminating.) A true
 644      * report means only that there was a moment at which quiescence
 645      * held.  False negatives are inevitable (for example when queues
 646      * indices lag updates, as described above), which is accommodated

 856      * ====================
 857      *
 858      * Regular ForkJoinTasks manage task cancellation (method cancel)
 859      * independently from the interrupted status of threads running
 860      * tasks.  Interrupts are issued internally only while
 861      * terminating, to wake up workers and cancel queued tasks.  By
 862      * default, interrupts are cleared only when necessary to ensure
 863      * that calls to LockSupport.park do not loop indefinitely (park
 864      * returns immediately if the current thread is interrupted).
 865      *
 866      * To comply with ExecutorService specs, we use subclasses of
 867      * abstract class InterruptibleTask for tasks that require
 868      * stronger interruption and cancellation guarantees.  External
 869      * submitters never run these tasks, even if in the common pool
 870      * (as indicated by ForkJoinTask.noUserHelp status bit).
 871      * InterruptibleTasks include a "runner" field (implemented
 872      * similarly to FutureTask) to support cancel(true).  Upon pool
 873      * shutdown, runners are interrupted so they can cancel. Since
 874      * external joining callers never run these tasks, they must await
 875      * cancellation by others, which can occur along several different
 876      * paths.


 877      *
 878      * Across these APIs, rules for reporting exceptions for tasks
 879      * with results accessed via join() differ from those via get(),
 880      * which differ from those invoked using pool submit methods by
 881      * non-workers (which comply with Future.get() specs). Internal
 882      * usages of ForkJoinTasks ignore interrupted status when executing
 883      * or awaiting completion.  Otherwise, reporting task results or
 884      * exceptions is preferred to throwing InterruptedExceptions,
 885      * which are in turn preferred to timeouts. Similarly, completion
 886      * status is preferred to reporting cancellation.  Cancellation is
 887      * reported as an unchecked exception by join(), and by worker
 888      * calls to get(), but is otherwise wrapped in a (checked)
 889      * ExecutionException.
 890      *
 891      * Worker Threads cannot be VirtualThreads, as enforced by
 892      * requiring ForkJoinWorkerThreads in factories.  There are
 893      * several constructions relying on this.  However as of this
 894      * writing, virtual thread bodies are by default run as some form
 895      * of InterruptibleTask.
 896      *

 923      * the placement of their fields. Caches misses and contention due
 924      * to false-sharing have been observed to slow down some programs
 925      * by more than a factor of four. Effects may vary across initial
 926      * memory configuarations, applications, and different garbage
 927      * collectors and GC settings, so there is no perfect solution.
 928      * Too much isolation may generate more cache misses in common
 929      * cases (because some fields snd slots are usually read at the
 930      * same time). The @Contended annotation provides only rough
 931      * control (for good reason). Similarly for relying on fields
 932      * being placed in size-sorted declaration order.
 933      *
 934      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 935      * most false-sharing misses with respect to other fields. Also,
 936      * ForkJoinPool fields are ordered such that fields less prone to
 937      * contention effects are first, offsetting those that otherwise
 938      * would be, while also reducing total footprint vs using
 939      * multiple @Contended regions, which tends to slow down
 940      * less-contended applications. To help arrange this, some
 941      * non-reference fields are declared as "long" even when ints or
 942      * shorts would suffice.  For class WorkQueue, an
 943      * embedded @Contended isolates the very busy top index, and
 944      * another segregates status and bookkeeping fields written
 945      * (mostly) by owners, that otherwise interfere with reading
 946      * array, top, and base fields. There are other variables commonly
 947      * contributing to false-sharing-related performance issues
 948      * (including fields of class Thread), but we can't do much about
 949      * this except try to minimize access.
 950      *
 951      * Initial sizing and resizing of WorkQueue arrays is an even more
 952      * delicate tradeoff because the best strategy systematically
 953      * varies across garbage collectors. Small arrays are better for
 954      * locality and reduce GC scan time, but large arrays reduce both
 955      * direct false-sharing and indirect cases due to GC bookkeeping
 956      * (cardmarks etc), and reduce the number of resizes, which are
 957      * not especially fast because they require atomic transfers.
 958      * Currently, arrays are initialized to be just large enough to
 959      * avoid resizing in most tree-structured tasks, but grow rapidly
 960      * until large.  (Maintenance note: any changes in fields, queues,
 961      * or their uses, or JVM layout policies, must be accompanied by
 962      * re-evaluation of these placement and sizing decisions.)


 963      *
 964      * Style notes
 965      * ===========
 966      *
 967      * Memory ordering relies mainly on atomic operations (CAS,
 968      * getAndSet, getAndAdd) along with moded accesses. These use
 969      * jdk-internal Unsafe for atomics and special memory modes,
 970      * rather than VarHandles, to avoid initialization dependencies in
 971      * other jdk components that require early parallelism.  This can
 972      * be awkward and ugly, but also reflects the need to control
 973      * outcomes across the unusual cases that arise in very racy code
 974      * with very few invariants. All atomic task slot updates use
 975      * Unsafe operations requiring offset positions, not indices, as
 976      * computed by method slotOffset. All fields are read into locals
 977      * before use, and null-checked if they are references, even if
 978      * they can never be null under current usages. Usually,
 979      * computations (held in local variables) are defined as soon as
 980      * logically enabled, sometimes to convince compilers that they
 981      * may be performed despite memory ordering constraints.  Array
 982      * accesses using masked indices include checks (that are always

1025      */
1026     static final long DEFAULT_KEEPALIVE = 60_000L;
1027 
1028     /**
1029      * Undershoot tolerance for idle timeouts, also serving as the
1030      * minimum allowed timeout value.
1031      */
1032     static final long TIMEOUT_SLOP = 20L;
1033 
1034     /**
1035      * The default value for common pool maxSpares.  Overridable using
1036      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1037      * system property.  The default value is far in excess of normal
1038      * requirements, but also far short of maximum capacity and typical OS
1039      * thread limits, so allows JVMs to catch misuse/abuse before
1040      * running out of resources needed to do so.
1041      */
1042     static final int DEFAULT_COMMON_MAX_SPARES = 256;
1043 
1044     /**
1045      * Initial capacity of work-stealing queue array.
1046      * Must be a power of two, at least 2. See above.
1047      */
1048     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1049 






1050     // conversions among short, int, long
1051     static final int  SMASK           = 0xffff;      // (unsigned) short bits
1052     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
1053     static final long UMASK           = ~LMASK;      // upper 32 bits
1054 
1055     // masks and sentinels for queue indices
1056     static final int MAX_CAP          = 0x7fff;   // max # workers
1057     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1058     static final int INVALID_ID       = 0x4000;   // unused external queue id
1059 
1060     // pool.runState bits
1061     static final long STOP            = 1L <<  0;   // terminating
1062     static final long SHUTDOWN        = 1L <<  1;   // terminate when quiescent
1063     static final long CLEANED         = 1L <<  2;   // stopped and queues cleared
1064     static final long TERMINATED      = 1L <<  3;   // only set if STOP also set
1065     static final long RS_LOCK         = 1L <<  4;   // lowest seqlock bit
1066 
1067     // spin/sleep limits for runState locking and elsewhere
1068     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1069     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos

1161     static final class DefaultForkJoinWorkerThreadFactory
1162         implements ForkJoinWorkerThreadFactory {
1163         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1164             return ((pool.workerNamePrefix == null) ? // is commonPool
1165                     new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1166                     new ForkJoinWorkerThread(null, pool, true, false));
1167         }
1168     }
1169 
1170     /**
1171      * Queues supporting work-stealing as well as external task
1172      * submission. See above for descriptions and algorithms.
1173      */
1174     static final class WorkQueue {
1175         // fields declared in order of their likely layout on most VMs
1176         final ForkJoinWorkerThread owner; // null if shared
1177         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
1178         int base;                  // index of next slot for poll
1179         final int config;          // mode bits
1180 
1181         @jdk.internal.vm.annotation.Contended("t") // segregate

1182         int top;                   // index of next slot for push
1183 
1184         // fields otherwise causing more unnecessary false-sharing cache misses
1185         @jdk.internal.vm.annotation.Contended("w")
1186         volatile int phase;        // versioned active status
1187         @jdk.internal.vm.annotation.Contended("w")
1188         int stackPred;             // pool stack (ctl) predecessor link
1189         @jdk.internal.vm.annotation.Contended("w")
1190         volatile int parking;      // nonzero if parked in awaitWork
1191         @jdk.internal.vm.annotation.Contended("w")
1192         volatile int source;       // source queue id (or DROPPED)
1193         @jdk.internal.vm.annotation.Contended("w")
1194         int nsteals;               // number of steals from other queues


1195 
1196         // Support for atomic operations
1197         private static final Unsafe U;
1198         private static final long PHASE;
1199         private static final long BASE;
1200         private static final long TOP;
1201         private static final long ARRAY;
1202 
1203         final void updateBase(int v) {
1204             U.putIntVolatile(this, BASE, v);
1205         }
1206         final void updateTop(int v) {
1207             U.putIntOpaque(this, TOP, v);
1208         }
1209         final void updateArray(ForkJoinTask<?>[] a) {
1210             U.getAndSetReference(this, ARRAY, a);
1211         }
1212         final void unlockPhase() {
1213             U.getAndAddInt(this, PHASE, IDLE);
1214         }
1215         final boolean tryLockPhase() {    // seqlock acquire
1216             int p;
1217             return (((p = phase) & IDLE) != 0 &&
1218                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1219         }
1220 
1221         /**
1222          * Constructor. For internal queues, most fields are initialized
1223          * upon thread start in pool.registerWorker.
1224          */
1225         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1226                   boolean clearThreadLocals) {




1227             this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1228             if ((this.owner = owner) == null) {
1229                 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1230                 phase = id | IDLE;
1231             }
1232         }
1233 
1234         /**
1235          * Returns an exportable index (used by ForkJoinWorkerThread).
1236          */
1237         final int getPoolIndex() {
1238             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1239         }
1240 
1241         /**
1242          * Returns the approximate number of tasks in the queue.
1243          */
1244         final int queueSize() {
1245             int unused = phase;             // for ordering effect
1246             return Math.max(top - base, 0); // ignore transient negative
1247         }
1248 
1249         /**
1250          * Pushes a task. Called only by owner or if already locked
1251          *
1252          * @param task the task; no-op if null
1253          * @param pool the pool to signal if was previously empty, else null
1254          * @param internal if caller owns this queue
1255          * @throws RejectedExecutionException if array could not be resized
1256          */
1257         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1258             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a, na;
1259             if ((a = array) != null && (cap = a.length) > 0) { // else disabled
1260                 int k = (m = cap - 1) & s;
1261                 if ((room = m - (s - b)) >= 0) {

1262                     top = s + 1;
1263                     long pos = slotOffset(k);
1264                     if (!internal)
1265                         U.putReference(a, pos, task);       // inside lock
1266                     else
1267                         U.getAndSetReference(a, pos, task); // fully fenced
1268                     if (room == 0 && (na = growArray(a, cap, s)) != null)
1269                         k = ((a = na).length - 1) & s;      // resize
1270                 }
1271                 if (!internal)
1272                     unlockPhase();
1273                 if (room < 0)
1274                     throw new RejectedExecutionException("Queue capacity exceeded");
1275                 if (pool != null &&
1276                     (room == 0 ||
1277                      U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null))
1278                     pool.signalWork(a, k);    // may have appeared empty
1279             }
1280         }
1281 
1282         /**
1283          * Resizes the queue array unless out of memory.
1284          * @param a old array
1285          * @param cap old array capacity
1286          * @param s current top
1287          * @return new array, or null on failure
1288          */
1289         private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
1290             int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
1291             ForkJoinTask<?>[] newArray = null;
1292             if (a != null && a.length == cap && cap > 0 && newCap > 0) {

1293                 try {
1294                     newArray = new ForkJoinTask<?>[newCap];
1295                 } catch (OutOfMemoryError ex) {
1296                 }
1297                 if (newArray != null) {               // else throw on next push
1298                     int mask = cap - 1, newMask = newCap - 1;
1299                     for (int k = s, j = cap; j > 0; --j, --k) {
1300                         ForkJoinTask<?> u;            // poll old, push to new
1301                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1302                                  a, slotOffset(k & mask), null)) == null)
1303                             break;                    // lost to pollers
1304                         newArray[k & newMask] = u;
1305                     }
1306                     updateArray(newArray);           // fully fenced
1307                 }
1308             }
1309             return newArray;
1310         }
1311 
1312         /**
1313          * Takes next task, if one exists, in lifo order.


1314          */
1315         private ForkJoinTask<?> localPop() {
1316             ForkJoinTask<?> t = null;
1317             int s = top - 1, cap; long k; ForkJoinTask<?>[] a;
1318             if ((a = array) != null && (cap = a.length) > 0 &&
1319                 U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
1320                 (t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
1321                 updateTop(s);
1322             return t;
1323         }
1324 
1325         /**
1326          * Takes next task, if one exists, in fifo order.
1327          */
1328         private ForkJoinTask<?> localPoll() {
1329             ForkJoinTask<?> t = null;
1330             int p = top, cap; ForkJoinTask<?>[] a;
1331             if ((a = array) != null && (cap = a.length) > 0) {
1332                 for (int b = base; p - b > 0; ) {
1333                     int nb = b + 1;
1334                     long k = slotOffset((cap - 1) & b);
1335                     if (U.getReference(a, k) == null) {
1336                         if (nb == p)
1337                             break;          // else base is lagging
1338                         while (b == (b = U.getIntAcquire(this, BASE)))
1339                             Thread.onSpinWait(); // spin to reduce memory traffic
1340                     }
1341                     else if ((t = (ForkJoinTask<?>)
1342                               U.getAndSetReference(a, k, null)) != null) {
1343                         updateBase(nb);
1344                         break;
1345                     }
1346                     else
1347                         b = base;


1348                 }
1349             }
1350             return t;
1351         }
1352 
1353         /**
1354          * Takes next task, if one exists, using configured mode.

1355          */
1356         final ForkJoinTask<?> nextLocalTask() {
1357             return (config & FIFO) == 0 ? localPop() : localPoll();
1358         }
1359 
1360         /**
1361          * Pops the given task only if it is at the current top.
1362          * @param task the task. Caller must ensure non-null.
1363          * @param internal if caller owns this queue
1364          */
1365         final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1366             boolean taken = false;
1367             ForkJoinTask<?>[] a = array;
1368             int p = top, s = p - 1, cap; long k;
1369             if (a != null && (cap = a.length) > 0 &&
1370                 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1371                 (internal || tryLockPhase())) {
1372                 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1373                     taken = true;
1374                     updateTop(s);
1375                 }
1376                 if (!internal)
1377                     unlockPhase();

1413                     a, slotOffset((cap - 1) & (nb = b + 1)));
1414                 if (base != b)                     // inconsistent
1415                     ;
1416                 else if (t == null) {
1417                     if (u == null && top - b <= 0)
1418                         break;                     // empty
1419                     if (pb == b)
1420                         Thread.onSpinWait();       // stalled
1421                 }
1422                 else if (U.compareAndSetReference(a, k, t, null)) {
1423                     updateBase(nb);
1424                     return t;
1425                 }
1426             }
1427             return null;
1428         }
1429 
1430         // specialized execution methods
1431 
1432         /**
1433          * Runs the given task, as well as remaining local tasks, and
1434          * those from the given queue that can be polled without interference.
1435          */
1436         final void topLevelExec(ForkJoinTask<?> task, WorkQueue q, int fifo) {
1437             if (task != null && q != null) {
1438                 int stolen = 1;
1439                 for (;;) {
1440                     task.doExec();
1441                     task = null;
1442                     int p = top, cap; ForkJoinTask<?>[] a;
1443                     if ((a = array) == null || (cap = a.length) <= 0)
1444                         break;
1445                     if (fifo == 0) {  // specialized localPop
1446                         int s = p - 1;
1447                         long k = slotOffset((cap - 1) & s);
1448                         if (U.getReference(a, k) != null &&
1449                             (task = (ForkJoinTask<?>)
1450                              U.getAndSetReference(a, k, null)) != null)
1451                             top = s;
1452                     } else {         // specialized localPoll
1453                         for (int b = base; p - b > 0; ) {
1454                             int nb = b + 1;
1455                             long k = slotOffset((cap - 1) & b);
1456                             if (U.getReference(a, k) != null &&
1457                                 (task = (ForkJoinTask<?>)
1458                                  U.getAndSetReference(a, k, null)) != null) {
1459                                 base = nb;
1460                                 break;
1461                             }
1462                             if (nb == p)
1463                                 break;
1464                             while (b == (b = U.getIntAcquire(this, BASE)))
1465                                 Thread.onSpinWait();
1466                         }
1467                     }
1468                     if (task == null) { // one-shot steal attempt
1469                         int qb = q.base, qcap; ForkJoinTask<?>[] qa; long bp;
1470                         if ((qa = q.array) != null && (qcap = qa.length) > 0 &&
1471                             (task = (ForkJoinTask<?>)U.getReferenceAcquire(
1472                                 qa, bp = slotOffset((qcap - 1) & qb))) != null &&
1473                             q.base == qb &&
1474                             U.compareAndSetReference(qa, bp, task, null)) {
1475                             q.base = qb + 1;
1476                             ++stolen;
1477                         }
1478                         else
1479                             break;
1480                     }
1481                 }
1482                 nsteals += stolen;
1483                 ForkJoinWorkerThread o;
1484                 if ((config & CLEAR_TLS) != 0 && (o = owner) != null)
1485                     o.resetThreadLocals();
1486             }
1487         }
1488 
1489         /**
1490          * Deep form of tryUnpush: Traverses from top and removes and
1491          * runs task if present.
1492          */
1493         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1494             ForkJoinTask<?>[] a = array;
1495             int b = base, p = top, s = p - 1, d = p - b, cap;
1496             if (a != null && (cap = a.length) > 0) {
1497                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1498                     long k; boolean taken;
1499                     ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1500                         a, k = slotOffset(i & m));
1501                     if (t == null)
1502                         break;
1503                     if (t == task) {
1504                         if (!internal && !tryLockPhase())
1505                             break;                  // fail if locked

1595                 }
1596                 else if (!(t instanceof CompletableFuture
1597                            .AsynchronousCompletionTask))
1598                     break;
1599                 if (blocker != null && blocker.isReleasable())
1600                     break;
1601                 if (base == b && t != null &&
1602                     U.compareAndSetReference(a, k, t, null)) {
1603                     updateBase(b + 1);
1604                     t.doExec();
1605                 }
1606             }
1607         }
1608 
1609         // misc
1610 
1611         /**
1612          * Cancels all local tasks. Called only by owner.
1613          */
1614         final void cancelTasks() {
1615             for (ForkJoinTask<?> t; (t = localPop()) != null; ) {
1616                 try {
1617                     t.cancel(false);
1618                 } catch (Throwable ignore) {
1619                 }
1620             }
1621         }
1622 
1623         /**
1624          * Returns true if internal and not known to be blocked.
1625          */
1626         final boolean isApparentlyUnblocked() {
1627             Thread wt; Thread.State s;
1628             return ((wt = owner) != null && (phase & IDLE) != 0 &&
1629                     (s = wt.getState()) != Thread.State.BLOCKED &&
1630                     s != Thread.State.WAITING &&
1631                     s != Thread.State.TIMED_WAITING);
1632         }
1633 
1634         static {
1635             U = Unsafe.getUnsafe();

1797         return false;
1798     }
1799 
1800     /**
1801      * Provides a name for ForkJoinWorkerThread constructor.
1802      */
1803     final String nextWorkerThreadName() {
1804         String prefix = workerNamePrefix;
1805         long tid = incrementThreadIds() + 1L;
1806         if (prefix == null) // commonPool has no prefix
1807             prefix = "ForkJoinPool.commonPool-worker-";
1808         return prefix.concat(Long.toString(tid));
1809     }
1810 
1811     /**
1812      * Finishes initializing and records internal queue.
1813      *
1814      * @param w caller's WorkQueue
1815      */
1816     final void registerWorker(WorkQueue w) {
1817         if (w != null) {
1818             w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1819             ThreadLocalRandom.localInit();
1820             int seed = w.stackPred = ThreadLocalRandom.getProbe();
1821             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1822             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1823             long stop = lockRunState() & STOP;
1824             try {
1825                 WorkQueue[] qs; int n;
1826                 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1827                     for (int k = n, m = n - 1;  ; id += 2) {
1828                         if (qs[id &= m] == null)
1829                             break;
1830                         if ((k -= 2) <= 0) {
1831                             id |= n;
1832                             break;
1833                         }
1834                     }
1835                     w.phase = id | phaseSeq;    // now publishable
1836                     if (id < n)
1837                         qs[id] = w;
1838                     else {                      // expand

1876             do {} while (c != (c = compareAndExchangeCtl(
1877                                    c, ((RC_MASK & (c - RC_UNIT)) |
1878                                        (TC_MASK & (c - TC_UNIT)) |
1879                                        (LMASK & c)))));
1880         }
1881         if (phase != 0 && w != null) {     // remove index unless terminating
1882             long ns = w.nsteals & 0xffffffffL;
1883             if ((runState & STOP) == 0L) {
1884                 WorkQueue[] qs; int n, i;
1885                 if ((lockRunState() & STOP) == 0L &&
1886                     (qs = queues) != null && (n = qs.length) > 0 &&
1887                     qs[i = phase & SMASK & (n - 1)] == w) {
1888                     qs[i] = null;
1889                     stealCount += ns;      // accumulate steals
1890                 }
1891                 unlockRunState();
1892             }
1893         }
1894         if ((tryTerminate(false, false) & STOP) == 0L &&
1895             phase != 0 && w != null && w.source != DROPPED) {

1896             w.cancelTasks();               // clean queue
1897             signalWork(null, 0);           // possibly replace
1898         }
1899         if (ex != null)
1900             ForkJoinTask.rethrow(ex);
1901     }
1902 
1903     /**
1904      * Releases an idle worker, or creates one if not enough exist,
1905      * giving up if array a is nonnull and task at a[k] already taken.
1906      */
1907     final void signalWork(ForkJoinTask<?>[] a, int k) {
1908         int pc = parallelism;
1909         for (long c = ctl;;) {
1910             WorkQueue[] qs = queues;
1911             long ac = (c + RC_UNIT) & RC_MASK, nc;
1912             int sp = (int)c, i = sp & SMASK;
1913             if ((short)(c >>> RC_SHIFT) >= pc)
1914                 break;
1915             if (qs == null)
1916                 break;
1917             if (qs.length <= i)
1918                 break;
1919             WorkQueue w = qs[i], v = null;
1920             if (sp == 0) {
1921                 if ((short)(c >>> TC_SHIFT) >= pc)
1922                     break;
1923                 nc = ((c + TC_UNIT) & TC_MASK) | ac;
1924             }
1925             else if ((v = w) == null)
1926                 break;
1927             else
1928                 nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
1929             if (a != null && k < a.length && k >= 0 && a[k] == null)
1930                 break;
1931             if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
1932                 if (v == null)
1933                     createWorker();
1934                 else {
1935                     v.phase = sp;
1936                     if (v.parking != 0)
1937                         U.unpark(v.owner);
1938                 }
1939                 break;
1940             }
1941         }
1942     }
1943 
1944     /**
1945      * Releases all waiting workers. Called only during shutdown.
1946      */
1947     private void releaseWaiters() {
1948         for (long c = ctl;;) {
1949             WorkQueue[] qs; WorkQueue v; int sp, i;
1950             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1951                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1995                 else if ((e & SHUTDOWN) == 0)
1996                     return 0;
1997                 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1998                     return 0;
1999                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
2000                     return 1;                             // enable termination
2001                 else
2002                     break;                                // restart
2003             }
2004         }
2005     }
2006 
2007     /**
2008      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
2009      * See above for explanation.
2010      *
2011      * @param w caller's WorkQueue (may be null on failed initialization)
2012      */
2013     final void runWorker(WorkQueue w) {
2014         if (w != null) {
2015             WorkQueue[] qs;
2016             int phase = w.phase, r = w.stackPred;         // seed from registerWorker
2017             int fifo = (int)config & FIFO, rescans = 0, n;
2018             while ((runState & STOP) == 0L && (qs = queues) != null &&
2019                    (n = qs.length) > 0) {
2020                 int i = r, step = (r >>> 16) | 1;
2021                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
2022                 scan: for (int j = n; j != 0; --j, i += step) {
2023                     WorkQueue q; int qid;
2024                     if ((q = qs[qid = i & (n - 1)]) != null) {
2025                         ForkJoinTask<?>[] a; int cap;     // poll queue
2026                         while ((a = q.array) != null && (cap = a.length) > 0) {
2027                             int b, nb, nk; long bp; ForkJoinTask<?> t;




2028                             t = (ForkJoinTask<?>)U.getReferenceAcquire(
2029                                 a, bp = slotOffset((cap - 1) & (b = q.base)));
2030                             if (q.base != b)
2031                                 continue;                 // inconsistent
2032                             long np = slotOffset(nk = (nb = b + 1) & (cap - 1));
2033                             if (t == null) {
2034                                 if (q.array != a)         // resized
2035                                     continue;
2036                                 if (rescans > 0)          // ran or stalled
2037                                     break scan;
2038                                 if (U.getReference(a, np) != null ||
2039                                     (rescans < 0 && q.top - b > 0)) {
2040                                     rescans = 1;          // may be stalled
2041                                     continue;

2042                                 }
2043                                 if (U.getReference(a, bp) != null)
2044                                     continue;             // stale
2045                                 break;                    // probably empty
2046                             }
2047                             if ((phase & IDLE) != 0 &&
2048                                 ((phase = tryReactivate(w, phase)) & IDLE) != 0) {
2049                                 rescans = 1;              // can't take yet
2050                                 break scan;
2051                             }
2052                             if (U.getReference(a, bp) == t &&
2053                                 U.compareAndSetReference(a, bp, t, null)) {
2054                                 q.base = nb;
2055                                 Object nt = U.getReferenceAcquire(a, np);
2056                                 w.source = qid;
2057                                 rescans = 1;
2058                                 if (nt != null &&         // confirm a[nk]
2059                                     U.getReferenceAcquire(a, np) == nt)
2060                                     signalWork(a, nk);    // propagate
2061                                 w.topLevelExec(t, q, fifo);
2062                             }
2063                         }
2064                     }
2065                 }
2066                 int prev;
2067                 if (rescans >= 0)
2068                     --rescans;
2069                 else if ((phase = deactivate(w, prev = phase)) == 0)
2070                     break;
2071                 else if (phase != prev)
2072                     rescans = 0;
2073             }
2074         }
2075     }
2076 
2077     /**
2078      * If active, tries to deactivate worker, keeping active on
2079      * contention; else awaits signal or termination
2080      *
2081      * @param w the work queue
2082      * @param phase w's currently known phase
2083      * @return current phase or 0 on exit
2084      */
2085     private int deactivate(WorkQueue w, int phase) {
2086         if (w != null) {                          // always true; hoist checks
2087             if ((phase & IDLE) == 0) {
2088                 int idlePhase = phase | IDLE;
2089                 long pc = ctl, e;
2090                 long qc = ((phase + (IDLE << 1)) & LMASK) | ((pc - RC_UNIT) & UMASK);
2091                 w.stackPred = (int)pc;            // set ctl stack link
2092                 w.phase = idlePhase;              // try to enqueue
2093                 if (!compareAndSetCtl(pc, qc))
2094                     w.phase = phase;              // back out on contention
2095                 else {
2096                     phase = idlePhase;
2097                     if ((qc & RC_MASK) <= 0L && ((e = runState) & SHUTDOWN) != 0L &&
2098                         (e & STOP) == 0L)
2099                         quiescent();              // may trigger quiescent termination
2100                 }
2101             }
2102             else if ((runState & STOP) != 0L)
2103                 phase = 0;
2104             else {                                // spin before blocking
2105                 int activePhase = phase + IDLE;
2106                 int noise = activePhase | (activePhase >>> 16);
2107                 int spins = (SPIN_WAITS << 1) | (noise & (SPIN_WAITS - 1));
2108                 while ((phase = w.phase) != activePhase && --spins != 0)
2109                     Thread.onSpinWait();
2110                 if (spins == 0 && awaitWork(w, phase = activePhase) != 0)
2111                     phase = 0;
2112             }

2113         }
2114         return phase;
2115     }
2116 
2117     /**
2118      * Reactivates worker w if it is currently top of ctl stack
2119      *
2120      * @param w the work queue
2121      * @param phase w's currently known (idle) phase
2122      * @return currently known phase on exit
2123      */
2124     private int tryReactivate(WorkQueue w, int phase) {
2125         int activePhase = phase + IDLE; long c;
2126         if (w != null && (phase = w.phase) != activePhase &&
2127             (int)(c = ctl) == activePhase &&
2128             compareAndSetCtl(c, (w.stackPred & LMASK) | ((c + RC_UNIT) & UMASK)))
2129             phase = w.phase = activePhase;
2130         return phase;
2131     }
2132 
2133     /**
2134      * Awaits signal or termination.
2135      *
2136      * @param w the work queue
2137      * @param activePhase w's next active phase
2138      * @return 0 if now active
2139      */
2140     private int awaitWork(WorkQueue w, int activePhase) {
2141         int idle = 1;
2142         if (w != null) {                      // always true; hoist checks
2143             long waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
2144             LockSupport.setCurrentBlocker(this);
2145             for (long deadline = 0L;;) {
2146                 Thread.interrupted();         // clear status
2147                 if ((runState & STOP) != 0L)
2148                     break;
2149                 if ((idle = w.phase - activePhase) == 0)
2150                     break;
2151                 boolean trimmable = false;    // use timed wait if trimmable
2152                 long d = 0L, c;
2153                 if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) {
2154                     long now = System.currentTimeMillis();
2155                     if (deadline == 0L)
2156                         deadline = waitTime + now;
2157                     if (deadline - now <= TIMEOUT_SLOP) {
2158                         if (tryTrim(w, c, activePhase))



2159                             break;
2160                         continue;             // lost race to trim



2161                     }
2162                     d = deadline;
2163                     trimmable = true;
2164                 }
2165                 w.parking = 1;                // enable unpark and recheck
2166                 if ((idle = w.phase - activePhase) != 0)
2167                     U.park(trimmable, d);
2168                 w.parking = 0;                // close unpark window
2169                 if (idle == 0 || (idle = w.phase - activePhase) == 0)
2170                     break;
2171             }
2172             LockSupport.setCurrentBlocker(null);
2173         }
2174         return idle;
2175     }
2176 
2177     /**
2178      * Tries to remove and deregister worker after timeout, and release
2179      * another to do the same unless new tasks are found.

2180      */
2181     private boolean tryTrim(WorkQueue w, long c, int activePhase) {
2182         if (w != null) {
2183             int vp, i; WorkQueue[] vs; WorkQueue v;
2184             long nc = ((w.stackPred & LMASK) |
2185                        ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
2186             if (compareAndSetCtl(c, nc)) {
2187                 w.source = DROPPED;
2188                 w.phase = activePhase;
2189                 if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2190                     vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2191                     compareAndSetCtl(           // try to wake up next waiter
2192                         nc, ((v.stackPred & LMASK) |
2193                              ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
2194                     v.source = INVALID_ID;      // enable cascaded timeouts
2195                     v.phase = vp;
2196                     U.unpark(v.owner);
2197                 }
2198                 return true;




2199             }
2200         }
2201         return false;
2202     }
2203 
2204     /**
2205      * Scans for and returns a polled task, if available.  Used only
2206      * for untracked polls. Begins scan at a random index to avoid
2207      * systematic unfairness.
2208      *
2209      * @param submissionsOnly if true, only scan submission queues
2210      */
2211     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
2212         if ((runState & STOP) == 0L) {
2213             WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
2214             int r = ThreadLocalRandom.nextSecondarySeed();
2215             if (submissionsOnly)                 // even indices only
2216                 r &= ~1;
2217             int step = (submissionsOnly) ? 2 : 1;
2218             if ((qs = queues) != null && (n = qs.length) > 0) {
2219                 for (int i = n; i > 0; i -= step, r += step) {
2220                     if ((q = qs[r & (n - 1)]) != null &&
2221                         (t = q.poll()) != null)

2598         else
2599             return 0;
2600     }
2601 
2602     /**
2603      * Gets and removes a local or stolen task for the given worker.
2604      *
2605      * @return a task, if available
2606      */
2607     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2608         ForkJoinTask<?> t;
2609         if (w == null || (t = w.nextLocalTask()) == null)
2610             t = pollScan(false);
2611         return t;
2612     }
2613 
2614     // External operations
2615 
2616     /**
2617      * Finds and locks a WorkQueue for an external submitter, or
2618      * throws RejectedExecutionException if shutdown

2619      * @param rejectOnShutdown true if RejectedExecutionException
2620      *        should be thrown when shutdown
2621      */
2622     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2623         int r;
2624         if ((r = ThreadLocalRandom.getProbe()) == 0) {
2625             ThreadLocalRandom.localInit();   // initialize caller's probe
2626             r = ThreadLocalRandom.getProbe();
2627         }
2628         for (;;) {
2629             WorkQueue q; WorkQueue[] qs; int n, id, i;
2630             if ((qs = queues) == null || (n = qs.length) <= 0)


2631                 break;
2632             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2633                 WorkQueue newq = new WorkQueue(null, id, 0, false);
2634                 lockRunState();
2635                 if (qs[i] == null && queues == qs)
2636                     q = qs[i] = newq;         // else lost race to install


2637                 unlockRunState();





2638             }
2639             if (q != null && q.tryLockPhase()) {
2640                 if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2641                     q.unlockPhase();          // check while q lock held
2642                     break;
2643                 }









2644                 return q;
2645             }
2646             r = ThreadLocalRandom.advanceProbe(r); // move
2647         }
2648         throw new RejectedExecutionException();
2649     }
2650 
2651     private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2652         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2653         if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2654             (wt = (ForkJoinWorkerThread)t).pool == this) {
2655             internal = true;
2656             q = wt.workQueue;
2657         }
2658         else {                     // find and lock queue
2659             internal = false;
2660             q = externalSubmissionQueue(true);
2661         }
2662         q.push(task, signalIfEmpty ? this : null, internal);
2663         return task;
2664     }
2665 












2666     /**
2667      * Returns queue for an external thread, if one exists that has
2668      * possibly ever submitted to the given pool (nonzero probe), or
2669      * null if none.
2670      */
2671     static WorkQueue externalQueue(ForkJoinPool p) {
2672         WorkQueue[] qs; int n;
2673         int r = ThreadLocalRandom.getProbe();
2674         return (p != null && (qs = p.queues) != null &&
2675                 (n = qs.length) > 0 && r != 0) ?
2676             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2677     }
2678 
2679     /**
2680      * Returns external queue for common pool.
2681      */
2682     static WorkQueue commonQueue() {
2683         return externalQueue(common);
2684     }
2685 
< prev index next >