< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

 543      * WorkQueue field "phase" encodes the queue array id in lower
 544      * bits, and otherwise acts similarly to the pool runState field:
 545      * The "IDLE" bit is clear while active (either a released worker
 546      * or a locked external queue), with other bits serving as a
 547      * version counter to distinguish changes across multiple reads.
 548      * Note that phase field updates lag queue CAS releases; seeing a
 549      * non-idle phase does not guarantee that the worker is available
 550      * (and so is never checked in this way).
 551      *
 552      * The ctl field also serves as the basis for memory
 553      * synchronization surrounding activation. This uses a more
 554      * efficient version of a Dekker-like rule that task producers and
 555      * consumers sync with each other by both writing/CASing ctl (even
 556      * if to its current value).  However, rather than CASing ctl to
 557      * its current value in the common case where no action is
 558      * required, we reduce write contention by ensuring that
 559      * signalWork invocations are prefaced with a fully fenced memory
 560      * access (which is usually needed anyway).
 561      *
 562      * Signalling. Signals (in signalWork) cause new or reactivated
 563      * workers to scan for tasks.  Method signalWork and its callers
 564      * try to approximate the unattainable goal of having the right
 565      * number of workers activated for the tasks at hand, but must err
 566      * on the side of too many workers vs too few to avoid stalls:
 567      *
 568      *  * If computations are purely tree structured, it suffices for
 569      *    every worker to activate another when it pushes a task into
 570      *    an empty queue, resulting in O(log(#threads)) steps to full
 571      *    activation. Emptiness must be conservatively approximated,
 572      *    which may result in unnecessary signals.  Also, to reduce
 573      *    resource usages in some cases, at the expense of slower
 574      *    startup in others, activation of an idle thread is preferred
 575      *    over creating a new one, here and elsewhere.
 576      *
 577      *  * At the other extreme, if "flat" tasks (those that do not in
 578      *    turn generate others) come in serially from only a single
 579      *    producer, each worker taking a task from a queue should
 580      *    propagate a signal if there are more tasks in that
 581      *    queue. This is equivalent to, but generally faster than,
 582      *    arranging the stealer take multiple tasks, re-pushing one or
 583      *    more on its own queue, and signalling (because its queue is
 584      *    empty), also resulting in logarithmic full activation
 585      *    time. If tasks do not not engage in unbounded loops based on
 586      *    the actions of other workers with unknown dependencies loop,
 587      *    this form of proagation can be limited to one signal per
 588      *    activation (phase change). We distinguish the cases by
 589      *    further signalling only if the task is an InterruptibleTask
 590      *    (see below), which are the only supported forms of task that
 591      *    may do so.
 592      *
 593      * * Because we don't know about usage patterns (or most commonly,
 594      *    mixtures), we use both approaches, which present even more
 595      *    opportunities to over-signal. (Failure to distinguish these
 596      *    cases in terms of submission methods was arguably an early
 597      *    design mistake.)  Note that in either of these contexts,
 598      *    signals may be (and often are) unnecessary because active
 599      *    workers continue scanning after running tasks without the
 600      *    need to be signalled (which is one reason work stealing is
 601      *    often faster than alternatives), so additional workers
 602      *    aren't needed.
 603      *
 604      * * For rapidly branching tasks that require full pool resources,
 605      *   oversignalling is OK, because signalWork will soon have no
 606      *   more workers to create or reactivate. But for others (mainly
 607      *   externally submitted tasks), overprovisioning may cause very
 608      *   noticeable slowdowns due to contention and resource
 609      *   wastage. We reduce impact by deactivating workers when
 610      *   queues don't have accessible tasks, but reactivating and
 611      *   rescanning if other tasks remain.
 612      *
 613      * * Despite these, signal contention and overhead effects still
 614      *   occur during ramp-up and ramp-down of small computations.
 615      *
 616      * Scanning. Method runWorker performs top-level scanning for (and
 617      * execution of) tasks by polling a pseudo-random permutation of
 618      * the array (by starting at a given index, and using a constant
 619      * cyclically exhaustive stride.)  It uses the same basic polling
 620      * method as WorkQueue.poll(), but restarts with a different
 621      * permutation on each invocation.  The pseudorandom generator
 622      * need not have high-quality statistical properties in the long
 623      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 624      * from ThreadLocalRandom probes, which are cheap and
 625      * suffice. Each queue's polling attempts to avoid becoming stuck
 626      * when other scanners/pollers stall.  Scans do not otherwise
 627      * explicitly take into account core affinities, loads, cache
 628      * localities, etc, However, they do exploit temporal locality
 629      * (which usually approximates these) by preferring to re-poll
 630      * from the same queue after a successful poll before trying
 631      * others, which also reduces bookkeeping, cache traffic, and
 632      * scanning overhead. But it also reduces fairness, which is
 633      * partially counteracted by giving up on detected interference
 634      * (which also reduces contention when too many workers try to
 635      * take small tasks from the same queue).
 636      *
 637      * Deactivation. When no tasks are found by a worker in runWorker,
 638      * it tries to deactivate()), giving up (and rescanning) on "ctl"
 639      * contention. To avoid missed signals during deactivation, the
 640      * method rescans and reactivates if there may have been a missed
 641      * signal during deactivation. To reduce false-alarm reactivations
 642      * while doing so, we scan multiple times (analogously to method
 643      * quiescent()) before trying to reactivate.  Because idle workers
 644      * are often not yet blocked (parked), we use a WorkQueue field to
 645      * advertise that a waiter actually needs unparking upon signal.



























 646      *
 647      * Quiescence. Workers scan looking for work, giving up when they
 648      * don't find any, without being sure that none are available.
 649      * However, some required functionality relies on consensus about
 650      * quiescence (also termination, discussed below). The count
 651      * fields in ctl allow accurate discovery of states in which all
 652      * workers are idle.  However, because external (asynchronous)
 653      * submitters are not part of this vote, these mechanisms
 654      * themselves do not guarantee that the pool is in a quiescent
 655      * state with respect to methods isQuiescent, shutdown (which
 656      * begins termination when quiescent), helpQuiesce, and indirectly
 657      * others including tryCompensate. Method quiescent() is used in
 658      * all of these contexts. It provides checks that all workers are
 659      * idle and there are no submissions that they could poll if they
 660      * were not idle, retrying on inconsistent reads of queues and
 661      * using the runState seqLock to retry on queue array updates.
 662      * (It also reports quiescence if the pool is terminating.) A true
 663      * report means only that there was a moment at which quiescence
 664      * held.  False negatives are inevitable (for example when queues
 665      * indices lag updates, as described above), which is accommodated

 875      * ====================
 876      *
 877      * Regular ForkJoinTasks manage task cancellation (method cancel)
 878      * independently from the interrupt status of threads running
 879      * tasks.  Interrupts are issued internally only while
 880      * terminating, to wake up workers and cancel queued tasks.  By
 881      * default, interrupts are cleared only when necessary to ensure
 882      * that calls to LockSupport.park do not loop indefinitely (park
 883      * returns immediately if the current thread is interrupted).
 884      *
 885      * To comply with ExecutorService specs, we use subclasses of
 886      * abstract class InterruptibleTask for tasks that require
 887      * stronger interruption and cancellation guarantees.  External
 888      * submitters never run these tasks, even if in the common pool
 889      * (as indicated by ForkJoinTask.noUserHelp status bit).
 890      * InterruptibleTasks include a "runner" field (implemented
 891      * similarly to FutureTask) to support cancel(true).  Upon pool
 892      * shutdown, runners are interrupted so they can cancel. Since
 893      * external joining callers never run these tasks, they must await
 894      * cancellation by others, which can occur along several different
 895      * paths. The inability to rely on caller-runs may also require
 896      * extra signalling (resulting in scanning and contention) so is
 897      * done only conditionally in methods push and runworker.
 898      *
 899      * Across these APIs, rules for reporting exceptions for tasks
 900      * with results accessed via join() differ from those via get(),
 901      * which differ from those invoked using pool submit methods by
 902      * non-workers (which comply with Future.get() specs). Internal
 903      * usages of ForkJoinTasks ignore interrupt status when executing
 904      * or awaiting completion.  Otherwise, reporting task results or
 905      * exceptions is preferred to throwing InterruptedExceptions,
 906      * which are in turn preferred to timeouts. Similarly, completion
 907      * status is preferred to reporting cancellation.  Cancellation is
 908      * reported as an unchecked exception by join(), and by worker
 909      * calls to get(), but is otherwise wrapped in a (checked)
 910      * ExecutionException.
 911      *
 912      * Worker Threads cannot be VirtualThreads, as enforced by
 913      * requiring ForkJoinWorkerThreads in factories.  There are
 914      * several constructions relying on this.  However as of this
 915      * writing, virtual thread bodies are by default run as some form
 916      * of InterruptibleTask.
 917      *

 944      * the placement of their fields. Caches misses and contention due
 945      * to false-sharing have been observed to slow down some programs
 946      * by more than a factor of four. Effects may vary across initial
 947      * memory configuarations, applications, and different garbage
 948      * collectors and GC settings, so there is no perfect solution.
 949      * Too much isolation may generate more cache misses in common
 950      * cases (because some fields snd slots are usually read at the
 951      * same time). The @Contended annotation provides only rough
 952      * control (for good reason). Similarly for relying on fields
 953      * being placed in size-sorted declaration order.
 954      *
 955      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 956      * most false-sharing misses with respect to other fields. Also,
 957      * ForkJoinPool fields are ordered such that fields less prone to
 958      * contention effects are first, offsetting those that otherwise
 959      * would be, while also reducing total footprint vs using
 960      * multiple @Contended regions, which tends to slow down
 961      * less-contended applications. To help arrange this, some
 962      * non-reference fields are declared as "long" even when ints or
 963      * shorts would suffice.  For class WorkQueue, an
 964      * embedded @Contended region segregates fields most heavily
 965      * updated by owners from those most commonly read by stealers or
 966      * other management.




 967      *
 968      * Initial sizing and resizing of WorkQueue arrays is an even more
 969      * delicate tradeoff because the best strategy systematically
 970      * varies across garbage collectors. Small arrays are better for
 971      * locality and reduce GC scan time, but large arrays reduce both
 972      * direct false-sharing and indirect cases due to GC bookkeeping
 973      * (cardmarks etc), and reduce the number of resizes, which are
 974      * not especially fast because they require atomic transfers.
 975      * Currently, arrays for workers are initialized to be just large
 976      * enough to avoid resizing in most tree-structured tasks, but
 977      * larger for external queues where both false-sharing problems
 978      * and the need for resizing are more common. (Maintenance note:
 979      * any changes in fields, queues, or their uses, or JVM layout
 980      * policies, must be accompanied by re-evaluation of these
 981      * placement and sizing decisions.)
 982      *
 983      * Style notes
 984      * ===========
 985      *
 986      * Memory ordering relies mainly on atomic operations (CAS,
 987      * getAndSet, getAndAdd) along with moded accesses. These use
 988      * jdk-internal Unsafe for atomics and special memory modes,
 989      * rather than VarHandles, to avoid initialization dependencies in
 990      * other jdk components that require early parallelism.  This can
 991      * be awkward and ugly, but also reflects the need to control
 992      * outcomes across the unusual cases that arise in very racy code
 993      * with very few invariants. All atomic task slot updates use
 994      * Unsafe operations requiring offset positions, not indices, as
 995      * computed by method slotOffset. All fields are read into locals
 996      * before use, and null-checked if they are references, even if
 997      * they can never be null under current usages. Usually,
 998      * computations (held in local variables) are defined as soon as
 999      * logically enabled, sometimes to convince compilers that they
1000      * may be performed despite memory ordering constraints.  Array
1001      * accesses using masked indices include checks (that are always

1044      */
1045     static final long DEFAULT_KEEPALIVE = 60_000L;
1046 
1047     /**
1048      * Undershoot tolerance for idle timeouts, also serving as the
1049      * minimum allowed timeout value.
1050      */
1051     static final long TIMEOUT_SLOP = 20L;
1052 
1053     /**
1054      * The default value for common pool maxSpares.  Overridable using
1055      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1056      * system property.  The default value is far in excess of normal
1057      * requirements, but also far short of maximum capacity and typical OS
1058      * thread limits, so allows JVMs to catch misuse/abuse before
1059      * running out of resources needed to do so.
1060      */
1061     static final int DEFAULT_COMMON_MAX_SPARES = 256;
1062 
1063     /**
1064      * Initial capacity of work-stealing queue array for workers.
1065      * Must be a power of two, at least 2. See above.
1066      */
1067     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1068 
1069     /**
1070      * Initial capacity of work-stealing queue array for external queues.
1071      * Must be a power of two, at least 2. See above.
1072      */
1073     static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
1074 
1075     // conversions among short, int, long
1076     static final int  SMASK           = 0xffff;      // (unsigned) short bits
1077     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
1078     static final long UMASK           = ~LMASK;      // upper 32 bits
1079 
1080     // masks and sentinels for queue indices
1081     static final int MAX_CAP          = 0x7fff;   // max # workers
1082     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1083     static final int INVALID_ID       = 0x4000;   // unused external queue id
1084 
1085     // pool.runState bits
1086     static final long STOP            = 1L <<  0;   // terminating
1087     static final long SHUTDOWN        = 1L <<  1;   // terminate when quiescent
1088     static final long CLEANED         = 1L <<  2;   // stopped and queues cleared
1089     static final long TERMINATED      = 1L <<  3;   // only set if STOP also set
1090     static final long RS_LOCK         = 1L <<  4;   // lowest seqlock bit
1091 
1092     // spin/sleep limits for runState locking and elsewhere
1093     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1094     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos

1186     static final class DefaultForkJoinWorkerThreadFactory
1187         implements ForkJoinWorkerThreadFactory {
1188         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1189             return ((pool.workerNamePrefix == null) ? // is commonPool
1190                     new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1191                     new ForkJoinWorkerThread(null, pool, true, false));
1192         }
1193     }
1194 
1195     /**
1196      * Queues supporting work-stealing as well as external task
1197      * submission. See above for descriptions and algorithms.
1198      */
1199     static final class WorkQueue {
1200         // fields declared in order of their likely layout on most VMs
1201         final ForkJoinWorkerThread owner; // null if shared
1202         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
1203         int base;                  // index of next slot for poll
1204         final int config;          // mode bits
1205 
1206         // fields otherwise causing more unnecessary false-sharing cache misses
1207         @jdk.internal.vm.annotation.Contended("w")
1208         int top;                   // index of next slot for push


1209         @jdk.internal.vm.annotation.Contended("w")
1210         volatile int phase;        // versioned active status
1211         @jdk.internal.vm.annotation.Contended("w")
1212         int stackPred;             // pool stack (ctl) predecessor link
1213         @jdk.internal.vm.annotation.Contended("w")


1214         volatile int source;       // source queue id (or DROPPED)
1215         @jdk.internal.vm.annotation.Contended("w")
1216         int nsteals;               // number of steals from other queues
1217         @jdk.internal.vm.annotation.Contended("w")
1218         volatile int parking;      // nonzero if parked in awaitWork
1219 
1220         // Support for atomic operations
1221         private static final Unsafe U;
1222         private static final long PHASE;
1223         private static final long BASE;
1224         private static final long TOP;
1225         private static final long ARRAY;
1226 
1227         final void updateBase(int v) {
1228             U.putIntVolatile(this, BASE, v);
1229         }
1230         final void updateTop(int v) {
1231             U.putIntOpaque(this, TOP, v);
1232         }
1233         final void updateArray(ForkJoinTask<?>[] a) {
1234             U.getAndSetReference(this, ARRAY, a);
1235         }
1236         final void unlockPhase() {
1237             U.getAndAddInt(this, PHASE, IDLE);
1238         }
1239         final boolean tryLockPhase() {    // seqlock acquire
1240             int p;
1241             return (((p = phase) & IDLE) != 0 &&
1242                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1243         }
1244 
1245         /**
1246          * Constructor. For internal queues, most fields are initialized
1247          * upon thread start in pool.registerWorker.
1248          */
1249         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1250                   boolean clearThreadLocals) {
1251             array = new ForkJoinTask<?>[owner == null ?
1252                                         INITIAL_EXTERNAL_QUEUE_CAPACITY :
1253                                         INITIAL_QUEUE_CAPACITY];
1254             this.owner = owner;
1255             this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;




1256         }
1257 
1258         /**
1259          * Returns an exportable index (used by ForkJoinWorkerThread).
1260          */
1261         final int getPoolIndex() {
1262             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1263         }
1264 
1265         /**
1266          * Returns the approximate number of tasks in the queue.
1267          */
1268         final int queueSize() {
1269             int unused = phase;             // for ordering effect
1270             return Math.max(top - base, 0); // ignore transient negative
1271         }
1272 
1273         /**
1274          * Pushes a task. Called only by owner or if already locked
1275          *
1276          * @param task the task; no-op if null
1277          * @param pool the pool to signal if was previously empty, else null
1278          * @param internal if caller owns this queue
1279          * @throws RejectedExecutionException if array could not be resized
1280          */
1281         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1282             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1283             if ((a = array) != null && (cap = a.length) > 0 && // else disabled
1284                 task != null) {
1285                 int pk = task.noUserHelp() + 1;             // prev slot offset
1286                 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1287                     top = s + 1;
1288                     long pos = slotOffset(m & s);
1289                     if (!internal)
1290                         U.putReference(a, pos, task);       // inside lock
1291                     else
1292                         U.getAndSetReference(a, pos, task); // fully fenced
1293                     if (room == 0)                          // resize
1294                         growArray(a, cap, s);
1295                 }
1296                 if (!internal)
1297                     unlockPhase();
1298                 if (room < 0)
1299                     throw new RejectedExecutionException("Queue capacity exceeded");
1300                 if ((room == 0 || a[m & (s - pk)] == null) &&
1301                     pool != null)
1302                     pool.signalWork();   // may have appeared empty

1303             }
1304         }
1305 
1306         /**
1307          * Resizes the queue array unless out of memory.
1308          * @param a old array
1309          * @param cap old array capacity
1310          * @param s current top

1311          */
1312         private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
1313             int newCap = cap << 1;

1314             if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1315                 ForkJoinTask<?>[] newArray = null;
1316                 try {
1317                     newArray = new ForkJoinTask<?>[newCap];
1318                 } catch (OutOfMemoryError ex) {
1319                 }
1320                 if (newArray != null) {               // else throw on next push
1321                     int mask = cap - 1, newMask = newCap - 1;
1322                     for (int k = s, j = cap; j > 0; --j, --k) {
1323                         ForkJoinTask<?> u;            // poll old, push to new
1324                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1325                                  a, slotOffset(k & mask), null)) == null)
1326                             break;                    // lost to pollers
1327                         newArray[k & newMask] = u;
1328                     }
1329                     updateArray(newArray);           // fully fenced
1330                 }
1331             }

1332         }
1333 
1334         /**
1335          * Takes next task, if one exists, in order specified by mode,
1336          * so acts as either local-pop or local-poll. Called only by owner.
1337          * @param fifo nonzero if FIFO mode
1338          */
1339         private ForkJoinTask<?> nextLocalTask(int fifo) {
1340             ForkJoinTask<?> t = null;
1341             ForkJoinTask<?>[] a = array;
1342             int b = base, p = top, cap;
1343             if (p - b > 0 && a != null && (cap = a.length) > 0) {
1344                 for (int m = cap - 1, s, nb;;) {
1345                     if (fifo == 0 || (nb = b + 1) == p) {
1346                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1347                                  a, slotOffset(m & (s = p - 1)), null)) != null)
1348                             updateTop(s);       // else lost race for only task
1349                         break;














1350                     }
1351                     if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1352                              a, slotOffset(m & b), null)) != null) {
1353                         updateBase(nb);
1354                         break;
1355                     }
1356                     while (b == (b = U.getIntAcquire(this, BASE)))
1357                         Thread.onSpinWait();    // spin to reduce memory traffic
1358                     if (p - b <= 0)
1359                         break;
1360                 }
1361             }
1362             return t;
1363         }
1364 
1365         /**
1366          * Takes next task, if one exists, using configured mode.
1367          * (Always internal, never called for Common pool.)
1368          */
1369         final ForkJoinTask<?> nextLocalTask() {
1370             return nextLocalTask(config & FIFO);
1371         }
1372 
1373         /**
1374          * Pops the given task only if it is at the current top.
1375          * @param task the task. Caller must ensure non-null.
1376          * @param internal if caller owns this queue
1377          */
1378         final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1379             boolean taken = false;
1380             ForkJoinTask<?>[] a = array;
1381             int p = top, s = p - 1, cap; long k;
1382             if (a != null && (cap = a.length) > 0 &&
1383                 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1384                 (internal || tryLockPhase())) {
1385                 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1386                     taken = true;
1387                     updateTop(s);
1388                 }
1389                 if (!internal)
1390                     unlockPhase();

1425                 Object u = U.getReference(         // next slot
1426                     a, slotOffset((cap - 1) & (nb = b + 1)));
1427                 if (base != b)                     // inconsistent
1428                     ;
1429                 else if (t == null) {
1430                     if (u == null && top - b <= 0)
1431                         break;                     // empty
1432                     if (pb == b)
1433                         Thread.onSpinWait();       // stalled
1434                 }
1435                 else if (U.compareAndSetReference(a, k, t, null)) {
1436                     updateBase(nb);
1437                     return t;
1438                 }
1439             }
1440             return null;
1441         }
1442 
1443         // specialized execution methods
1444 
1445         /**
1446          * Runs the given task, as well as remaining local tasks.


1447          */
1448         final void topLevelExec(ForkJoinTask<?> task, int fifo) {







1449             while (task != null) {
1450                 task.doExec();
1451                 task = nextLocalTask(fifo);
1452             }
1453         }
1454 
1455         /**
1456          * Deep form of tryUnpush: Traverses from top and removes and
1457          * runs task if present.
1458          */
1459         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1460             ForkJoinTask<?>[] a = array;
1461             int b = base, p = top, s = p - 1, d = p - b, cap;
1462             if (a != null && (cap = a.length) > 0) {
1463                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1464                     long k; boolean taken;
1465                     ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1466                         a, k = slotOffset(i & m));
1467                     if (t == null)
1468                         break;
1469                     if (t == task) {
1470                         if (!internal && !tryLockPhase())
1471                             break;                  // fail if locked

1561                 }
1562                 else if (!(t instanceof CompletableFuture
1563                            .AsynchronousCompletionTask))
1564                     break;
1565                 if (blocker != null && blocker.isReleasable())
1566                     break;
1567                 if (base == b && t != null &&
1568                     U.compareAndSetReference(a, k, t, null)) {
1569                     updateBase(b + 1);
1570                     t.doExec();
1571                 }
1572             }
1573         }
1574 
1575         // misc
1576 
1577         /**
1578          * Cancels all local tasks. Called only by owner.
1579          */
1580         final void cancelTasks() {
1581             for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) {
1582                 try {
1583                     t.cancel(false);
1584                 } catch (Throwable ignore) {
1585                 }
1586             }
1587         }
1588 
1589         /**
1590          * Returns true if internal and not known to be blocked.
1591          */
1592         final boolean isApparentlyUnblocked() {
1593             Thread wt; Thread.State s;
1594             return ((wt = owner) != null && (phase & IDLE) != 0 &&
1595                     (s = wt.getState()) != Thread.State.BLOCKED &&
1596                     s != Thread.State.WAITING &&
1597                     s != Thread.State.TIMED_WAITING);
1598         }
1599 
1600         static {
1601             U = Unsafe.getUnsafe();

1763         return false;
1764     }
1765 
1766     /**
1767      * Provides a name for ForkJoinWorkerThread constructor.
1768      */
1769     final String nextWorkerThreadName() {
1770         String prefix = workerNamePrefix;
1771         long tid = incrementThreadIds() + 1L;
1772         if (prefix == null) // commonPool has no prefix
1773             prefix = "ForkJoinPool.commonPool-worker-";
1774         return prefix.concat(Long.toString(tid));
1775     }
1776 
1777     /**
1778      * Finishes initializing and records internal queue.
1779      *
1780      * @param w caller's WorkQueue
1781      */
1782     final void registerWorker(WorkQueue w) {
1783         if (w != null && (runState & STOP) == 0L) {

1784             ThreadLocalRandom.localInit();
1785             int seed = w.stackPred = ThreadLocalRandom.getProbe();
1786             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1787             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1788             long stop = lockRunState() & STOP;
1789             try {
1790                 WorkQueue[] qs; int n;
1791                 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1792                     for (int k = n, m = n - 1;  ; id += 2) {
1793                         if (qs[id &= m] == null)
1794                             break;
1795                         if ((k -= 2) <= 0) {
1796                             id |= n;
1797                             break;
1798                         }
1799                     }
1800                     w.phase = id | phaseSeq;    // now publishable
1801                     if (id < n)
1802                         qs[id] = w;
1803                     else {                      // expand

1841             do {} while (c != (c = compareAndExchangeCtl(
1842                                    c, ((RC_MASK & (c - RC_UNIT)) |
1843                                        (TC_MASK & (c - TC_UNIT)) |
1844                                        (LMASK & c)))));
1845         }
1846         if (phase != 0 && w != null) {     // remove index unless terminating
1847             long ns = w.nsteals & 0xffffffffL;
1848             if ((runState & STOP) == 0L) {
1849                 WorkQueue[] qs; int n, i;
1850                 if ((lockRunState() & STOP) == 0L &&
1851                     (qs = queues) != null && (n = qs.length) > 0 &&
1852                     qs[i = phase & SMASK & (n - 1)] == w) {
1853                     qs[i] = null;
1854                     stealCount += ns;      // accumulate steals
1855                 }
1856                 unlockRunState();
1857             }
1858         }
1859         if ((tryTerminate(false, false) & STOP) == 0L &&
1860             phase != 0 && w != null && w.source != DROPPED) {
1861             signalWork();                  // possibly replace
1862             w.cancelTasks();               // clean queue

1863         }
1864         if (ex != null)
1865             ForkJoinTask.rethrow(ex);
1866     }
1867 
1868     /**
1869      * Releases an idle worker, or creates one if not enough exist.

1870      */
1871     final void signalWork() {
1872         int pc = parallelism;
1873         for (long c = ctl;;) {
1874             WorkQueue[] qs = queues;
1875             long ac = (c + RC_UNIT) & RC_MASK, nc;
1876             int sp = (int)c, i = sp & SMASK;
1877             if ((short)(c >>> RC_SHIFT) >= pc)
1878                 break;
1879             if (qs == null)
1880                 break;
1881             if (qs.length <= i)
1882                 break;
1883             WorkQueue w = qs[i], v = null;
1884             if (sp == 0) {
1885                 if ((short)(c >>> TC_SHIFT) >= pc)
1886                     break;
1887                 nc = ((c + TC_UNIT) & TC_MASK);
1888             }
1889             else if ((v = w) == null)
1890                 break;
1891             else
1892                 nc = (v.stackPred & LMASK) | (c & TC_MASK);
1893             if (c == (c = compareAndExchangeCtl(c, nc | ac))) {


1894                 if (v == null)
1895                     createWorker();
1896                 else {
1897                     v.phase = sp;
1898                     if (v.parking != 0)
1899                         U.unpark(v.owner);
1900                 }
1901                 break;
1902             }
1903         }
1904     }
1905 
1906     /**
1907      * Releases all waiting workers. Called only during shutdown.
1908      */
1909     private void releaseWaiters() {
1910         for (long c = ctl;;) {
1911             WorkQueue[] qs; WorkQueue v; int sp, i;
1912             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1913                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1957                 else if ((e & SHUTDOWN) == 0)
1958                     return 0;
1959                 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1960                     return 0;
1961                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1962                     return 1;                             // enable termination
1963                 else
1964                     break;                                // restart
1965             }
1966         }
1967     }
1968 
1969     /**
1970      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1971      * See above for explanation.
1972      *
1973      * @param w caller's WorkQueue (may be null on failed initialization)
1974      */
1975     final void runWorker(WorkQueue w) {
1976         if (w != null) {
1977             int phase = w.phase, r = w.stackPred;     // seed from registerWorker
1978             int fifo = w.config & FIFO, nsteals = 0, src = -1;
1979             for (;;) {
1980                 WorkQueue[] qs;






1981                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1982                 if ((runState & STOP) != 0L || (qs = queues) == null)
1983                     break;
1984                 int n = qs.length, i = r, step = (r >>> 16) | 1;
1985                 boolean rescan = false;
1986                 scan: for (int l = n; l > 0; --l, i += step) {  // scan queues
1987                     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1988                     if ((q = qs[j = i & (n - 1)]) != null &&
1989                         (a = q.array) != null && (cap = a.length) > 0) {
1990                         for (int m = cap - 1, pb = -1, b = q.base;;) {
1991                             ForkJoinTask<?> t; long k;
1992                             t = (ForkJoinTask<?>)U.getReferenceAcquire(
1993                                 a, k = slotOffset(m & b));
1994                             if (b != (b = q.base) || t == null ||
1995                                 !U.compareAndSetReference(a, k, t, null)) {
1996                                 if (a[b & m] == null) {
1997                                     if (rescan)           // end of run
1998                                         break scan;
1999                                     if (a[(b + 1) & m] == null &&
2000                                         a[(b + 2) & m] == null) {
2001                                         break;            // probably empty
2002                                     }
2003                                     if (pb == (pb = b)) { // track progress
2004                                         rescan = true;    // stalled; reorder scan
2005                                         break scan;
2006                                     }
2007                                 }

2008                             }
2009                             else {
2010                                 boolean propagate;
2011                                 int nb = q.base = b + 1, prevSrc = src;
2012                                 w.nsteals = ++nsteals;
2013                                 w.source = src = j;       // volatile
2014                                 rescan = true;
2015                                 int nh = t.noUserHelp();
2016                                 if (propagate =
2017                                     (prevSrc != src || nh != 0) && a[nb & m] != null)
2018                                     signalWork();
2019                                 w.topLevelExec(t, fifo);
2020                                 if ((b = q.base) != nb && !propagate)
2021                                     break scan;          // reduce interference




2022                             }
2023                         }
2024                     }
2025                 }
2026                 if (!rescan) {
2027                     if (((phase = deactivate(w, phase)) & IDLE) != 0)
2028                         break;
2029                     src = -1;                            // re-enable propagation
2030                 }
2031             }
2032         }
2033     }
2034 
2035     /**
2036      * Deactivates and if necessary awaits signal or termination.

2037      *
2038      * @param w the worker
2039      * @param phase current phase
2040      * @return current phase, with IDLE set if worker should exit
2041      */
2042     private int deactivate(WorkQueue w, int phase) {
2043         if (w == null)                        // currently impossible
2044             return IDLE;
2045         int p = phase | IDLE, activePhase = phase + (IDLE << 1);
2046         long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
2047         int sp = w.stackPred = (int)pc;       // set ctl stack link
2048         w.phase = p;
2049         if (!compareAndSetCtl(pc, qc))        // try to enqueue
2050             return w.phase = phase;           // back out on possible signal
2051         int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
2052         if (((e = runState) & STOP) != 0L ||
2053             ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
2054             (qs = queues) == null || (n = qs.length) <= 0)
2055             return IDLE;                      // terminating
2056 
2057         for (int prechecks = Math.min(ac, 2), // reactivation threshold
2058              k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
2059             WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
2060             if (w.phase == activePhase)
2061                 return activePhase;
2062             if (--k < 0)
2063                 return awaitWork(w, p);       // block, drop, or exit
2064             if ((q = qs[k & (n - 1)]) == null)
2065                 Thread.onSpinWait();
2066             else if ((a = q.array) != null && (cap = a.length) > 0 &&
2067                      a[q.base & (cap - 1)] != null && --prechecks < 0 &&
2068                      (int)(c = ctl) == activePhase &&
2069                      compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
2070                 return w.phase = activePhase; // reactivate
2071         }



















2072     }
2073 
2074     /**
2075      * Awaits signal or termination.
2076      *
2077      * @param w the work queue
2078      * @param p current phase (known to be idle)
2079      * @return current phase, with IDLE set if worker should exit
2080      */
2081     private int awaitWork(WorkQueue w, int p) {
2082         if (w != null) {
2083             ForkJoinWorkerThread t; long deadline;
2084             if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
2085                 t.resetThreadLocals();          // clear before reactivate
2086             if ((ctl & RC_MASK) > 0L)
2087                 deadline = 0L;
2088             else if ((deadline =
2089                       (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
2090                       System.currentTimeMillis()) == 0L)
2091                 deadline = 1L;                 // avoid zero
2092             int activePhase = p + IDLE;
2093             if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
2094                 LockSupport.setCurrentBlocker(this);
2095                 w.parking = 1;                 // enable unpark
2096                 while ((p = w.phase) != activePhase) {
2097                     boolean trimmable = false; int trim;
2098                     Thread.interrupted();      // clear status
2099                     if ((runState & STOP) != 0L)
2100                         break;
2101                     if (deadline != 0L) {
2102                         if ((trim = tryTrim(w, p, deadline)) > 0)






2103                             break;
2104                         else if (trim < 0)
2105                             deadline = 0L;
2106                         else
2107                             trimmable = true;
2108                     }
2109                     U.park(trimmable, deadline);

2110                 }
2111                 w.parking = 0;
2112                 LockSupport.setCurrentBlocker(null);




2113             }

2114         }
2115         return p;
2116     }
2117 
2118     /**
2119      * Tries to remove and deregister worker after timeout, and release
2120      * another to do the same.
2121      * @return > 0: trimmed, < 0 : not trimmable, else 0
2122      */
2123     private int tryTrim(WorkQueue w, int phase, long deadline) {
2124         long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2125         if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
2126             stat = -1;                      // no longer ctl top
2127         else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2128             stat = 0;                       // spurious wakeup
2129         else if (!compareAndSetCtl(
2130                      c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2131                                (TC_MASK & (c - TC_UNIT)))))
2132             stat = -1;                      // lost race to signaller
2133         else {
2134             stat = 1;
2135             w.source = DROPPED;
2136             w.phase = activePhase;
2137             if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2138                 vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2139                 compareAndSetCtl(           // try to wake up next waiter
2140                     nc, ((UMASK & (nc + RC_UNIT)) |
2141                          (nc & TC_MASK) | (v.stackPred & LMASK)))) {
2142                 v.source = INVALID_ID;      // enable cascaded timeouts
2143                 v.phase = vp;
2144                 U.unpark(v.owner);
2145             }
2146         }
2147         return stat;
2148     }
2149 
2150     /**
2151      * Scans for and returns a polled task, if available.  Used only
2152      * for untracked polls. Begins scan at a random index to avoid
2153      * systematic unfairness.
2154      *
2155      * @param submissionsOnly if true, only scan submission queues
2156      */
2157     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
2158         if ((runState & STOP) == 0L) {
2159             WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
2160             int r = ThreadLocalRandom.nextSecondarySeed();
2161             if (submissionsOnly)                 // even indices only
2162                 r &= ~1;
2163             int step = (submissionsOnly) ? 2 : 1;
2164             if ((qs = queues) != null && (n = qs.length) > 0) {
2165                 for (int i = n; i > 0; i -= step, r += step) {
2166                     if ((q = qs[r & (n - 1)]) != null &&
2167                         (t = q.poll()) != null)

2544         else
2545             return 0;
2546     }
2547 
2548     /**
2549      * Gets and removes a local or stolen task for the given worker.
2550      *
2551      * @return a task, if available
2552      */
2553     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2554         ForkJoinTask<?> t;
2555         if (w == null || (t = w.nextLocalTask()) == null)
2556             t = pollScan(false);
2557         return t;
2558     }
2559 
2560     // External operations
2561 
2562     /**
2563      * Finds and locks a WorkQueue for an external submitter, or
2564      * throws RejectedExecutionException if shutdown or terminating.
2565      * @param r current ThreadLocalRandom.getProbe() value
2566      * @param rejectOnShutdown true if RejectedExecutionException
2567      *        should be thrown when shutdown (else only if terminating)
2568      */
2569     private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
2570         int reuse;                                   // nonzero if prefer create
2571         if ((reuse = r) == 0) {
2572             ThreadLocalRandom.localInit();           // initialize caller's probe
2573             r = ThreadLocalRandom.getProbe();
2574         }
2575         for (int probes = 0; ; ++probes) {
2576             int n, i, id; WorkQueue[] qs; WorkQueue q;
2577             if ((qs = queues) == null)
2578                 break;
2579             if ((n = qs.length) <= 0)
2580                 break;
2581             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2582                 WorkQueue w = new WorkQueue(null, id, 0, false);
2583                 w.phase = id;
2584                 boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
2585                                   rejectOnShutdown);
2586                 if (!reject && queues == qs && qs[i] == null)
2587                     q = qs[i] = w;                   // else lost race to install
2588                 unlockRunState();
2589                 if (q != null)
2590                     return q;
2591                 if (reject)
2592                     break;
2593                 reuse = 0;
2594             }
2595             if (reuse == 0 || !q.tryLockPhase()) {   // move index
2596                 if (reuse == 0) {
2597                     if (probes >= n >> 1)
2598                         reuse = r;                   // stop prefering free slot
2599                 }
2600                 else if (q != null)
2601                     reuse = 0;                       // probe on collision
2602                 r = ThreadLocalRandom.advanceProbe(r);
2603             }
2604             else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2605                 q.unlockPhase();                     // check while q lock held
2606                 break;
2607             }
2608             else
2609                 return q;


2610         }
2611         throw new RejectedExecutionException();
2612     }
2613 
2614     private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2615         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2616         if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2617             (wt = (ForkJoinWorkerThread)t).pool == this) {
2618             internal = true;
2619             q = wt.workQueue;
2620         }
2621         else {                     // find and lock queue
2622             internal = false;
2623             q = submissionQueue(ThreadLocalRandom.getProbe(), true);
2624         }
2625         q.push(task, signalIfEmpty ? this : null, internal);
2626         return task;
2627     }
2628 
2629     /**
2630      * Returns queue for an external submission, bypassing call to
2631      * submissionQueue if already established and unlocked.
2632      */
2633     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2634         WorkQueue[] qs; WorkQueue q; int n;
2635         int r = ThreadLocalRandom.getProbe();
2636         return (((qs = queues) != null && (n = qs.length) > 0 &&
2637                  (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
2638                  q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
2639     }
2640 
2641     /**
2642      * Returns queue for an external thread, if one exists that has
2643      * possibly ever submitted to the given pool (nonzero probe), or
2644      * null if none.
2645      */
2646     static WorkQueue externalQueue(ForkJoinPool p) {
2647         WorkQueue[] qs; int n;
2648         int r = ThreadLocalRandom.getProbe();
2649         return (p != null && (qs = p.queues) != null &&
2650                 (n = qs.length) > 0 && r != 0) ?
2651             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2652     }
2653 
2654     /**
2655      * Returns external queue for common pool.
2656      */
2657     static WorkQueue commonQueue() {
2658         return externalQueue(common);
2659     }
2660 

 543      * WorkQueue field "phase" encodes the queue array id in lower
 544      * bits, and otherwise acts similarly to the pool runState field:
 545      * The "IDLE" bit is clear while active (either a released worker
 546      * or a locked external queue), with other bits serving as a
 547      * version counter to distinguish changes across multiple reads.
 548      * Note that phase field updates lag queue CAS releases; seeing a
 549      * non-idle phase does not guarantee that the worker is available
 550      * (and so is never checked in this way).
 551      *
 552      * The ctl field also serves as the basis for memory
 553      * synchronization surrounding activation. This uses a more
 554      * efficient version of a Dekker-like rule that task producers and
 555      * consumers sync with each other by both writing/CASing ctl (even
 556      * if to its current value).  However, rather than CASing ctl to
 557      * its current value in the common case where no action is
 558      * required, we reduce write contention by ensuring that
 559      * signalWork invocations are prefaced with a fully fenced memory
 560      * access (which is usually needed anyway).
 561      *
 562      * Signalling. Signals (in signalWork) cause new or reactivated
 563      * workers to scan for tasks.  SignalWork is invoked in two cases:
 564      * (1) When a task is pushed onto an empty queue, and (2) When a
 565      * worker takes a top-level task from a queue that has additional
 566      * tasks. Together, these suffice in O(log(#threads)) steps to
 567      * fully activate with at least enough workers, and ideally no
 568      * more than required.  This ideal is unobtainable: Callers do not
 569      * know whether another worker will finish its current task and
 570      * poll for others without need of a signal (which is otherwise an
 571      * advantage of work-stealing vs other schemes), and also must
 572      * conservatively estimate the triggering conditions of emptiness
 573      * or non-emptiness; all of which usually cause more activations
 574      * than necessary (see below). (Method signalWork is also used as
 575      * failsafe in case of Thread failures in deregisterWorker, to
 576      * activate or create a new worker to replace them).
 577      *
 578      * Top-Level scheduling
 579      * ====================



































 580      *
 581      * Scanning. Method runWorker performs top-level scanning for (and
 582      * execution of) tasks by polling a pseudo-random permutation of
 583      * the array (by starting at a given index, and using a constant
 584      * cyclically exhaustive stride.)  It uses the same basic polling
 585      * method as WorkQueue.poll(), but restarts with a different
 586      * permutation on each rescan.  The pseudorandom generator need
 587      * not have high-quality statistical properties in the long
 588      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 589      * from ThreadLocalRandom probes, which are cheap and suffice.











 590      *
 591      * Deactivation. When no tasks are found by a worker in runWorker,
 592      * it invokes awaitWork, that first deactivates (to an IDLE
 593      * phase).  Avoiding missed signals during deactivation requires a
 594      * (conservative) rescan, reactivating if there may be tasks to
 595      * poll. Because idle workers are often not yet blocked (parked),
 596      * we use a WorkQueue field to advertise that a waiter actually
 597      * needs unparking upon signal.
 598      *
 599      * When tasks are constructed as (recursive) DAGs, top-level
 600      * scanning is usually infrequent, and doesn't encounter most
 601      * of the following problems addressed by runWorker and awaitWork:
 602      *
 603      * Locality. Polls are organized into "runs", continuing until
 604      * empty or contended, while also minimizing interference by
 605      * postponing bookeeping to ends of runs. This may reduce
 606      * fairness.
 607      *
 608      * Contention. When many workers try to poll few queues, they
 609      * often collide, generating CAS failures and disrupting locality
 610      * of workers already running their tasks. This also leads to
 611      * stalls when tasks cannot be taken because other workers have
 612      * not finished poll operations, which is detected by reading
 613      * ahead in queue arrays. In both cases, workers restart scans in a
 614      * way that approximates randomized backoff.
 615      *
 616      * Oversignalling. When many short top-level tasks are present in
 617      * a small number of queues, the above signalling strategy may
 618      * activate many more workers than needed, worsening locality and
 619      * contention problems, while also generating more global
 620      * contention (field ctl is CASed on every activation and
 621      * deactivation). We filter out (both in runWorker and
 622      * signalWork) attempted signals that are surely not needed
 623      * because the signalled tasks are already taken.
 624      *
 625      * Shutdown and Quiescence
 626      * =======================
 627      *
 628      * Quiescence. Workers scan looking for work, giving up when they
 629      * don't find any, without being sure that none are available.
 630      * However, some required functionality relies on consensus about
 631      * quiescence (also termination, discussed below). The count
 632      * fields in ctl allow accurate discovery of states in which all
 633      * workers are idle.  However, because external (asynchronous)
 634      * submitters are not part of this vote, these mechanisms
 635      * themselves do not guarantee that the pool is in a quiescent
 636      * state with respect to methods isQuiescent, shutdown (which
 637      * begins termination when quiescent), helpQuiesce, and indirectly
 638      * others including tryCompensate. Method quiescent() is used in
 639      * all of these contexts. It provides checks that all workers are
 640      * idle and there are no submissions that they could poll if they
 641      * were not idle, retrying on inconsistent reads of queues and
 642      * using the runState seqLock to retry on queue array updates.
 643      * (It also reports quiescence if the pool is terminating.) A true
 644      * report means only that there was a moment at which quiescence
 645      * held.  False negatives are inevitable (for example when queues
 646      * indices lag updates, as described above), which is accommodated

 856      * ====================
 857      *
 858      * Regular ForkJoinTasks manage task cancellation (method cancel)
 859      * independently from the interrupt status of threads running
 860      * tasks.  Interrupts are issued internally only while
 861      * terminating, to wake up workers and cancel queued tasks.  By
 862      * default, interrupts are cleared only when necessary to ensure
 863      * that calls to LockSupport.park do not loop indefinitely (park
 864      * returns immediately if the current thread is interrupted).
 865      *
 866      * To comply with ExecutorService specs, we use subclasses of
 867      * abstract class InterruptibleTask for tasks that require
 868      * stronger interruption and cancellation guarantees.  External
 869      * submitters never run these tasks, even if in the common pool
 870      * (as indicated by ForkJoinTask.noUserHelp status bit).
 871      * InterruptibleTasks include a "runner" field (implemented
 872      * similarly to FutureTask) to support cancel(true).  Upon pool
 873      * shutdown, runners are interrupted so they can cancel. Since
 874      * external joining callers never run these tasks, they must await
 875      * cancellation by others, which can occur along several different
 876      * paths.


 877      *
 878      * Across these APIs, rules for reporting exceptions for tasks
 879      * with results accessed via join() differ from those via get(),
 880      * which differ from those invoked using pool submit methods by
 881      * non-workers (which comply with Future.get() specs). Internal
 882      * usages of ForkJoinTasks ignore interrupt status when executing
 883      * or awaiting completion.  Otherwise, reporting task results or
 884      * exceptions is preferred to throwing InterruptedExceptions,
 885      * which are in turn preferred to timeouts. Similarly, completion
 886      * status is preferred to reporting cancellation.  Cancellation is
 887      * reported as an unchecked exception by join(), and by worker
 888      * calls to get(), but is otherwise wrapped in a (checked)
 889      * ExecutionException.
 890      *
 891      * Worker Threads cannot be VirtualThreads, as enforced by
 892      * requiring ForkJoinWorkerThreads in factories.  There are
 893      * several constructions relying on this.  However as of this
 894      * writing, virtual thread bodies are by default run as some form
 895      * of InterruptibleTask.
 896      *

 923      * the placement of their fields. Caches misses and contention due
 924      * to false-sharing have been observed to slow down some programs
 925      * by more than a factor of four. Effects may vary across initial
 926      * memory configuarations, applications, and different garbage
 927      * collectors and GC settings, so there is no perfect solution.
 928      * Too much isolation may generate more cache misses in common
 929      * cases (because some fields snd slots are usually read at the
 930      * same time). The @Contended annotation provides only rough
 931      * control (for good reason). Similarly for relying on fields
 932      * being placed in size-sorted declaration order.
 933      *
 934      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 935      * most false-sharing misses with respect to other fields. Also,
 936      * ForkJoinPool fields are ordered such that fields less prone to
 937      * contention effects are first, offsetting those that otherwise
 938      * would be, while also reducing total footprint vs using
 939      * multiple @Contended regions, which tends to slow down
 940      * less-contended applications. To help arrange this, some
 941      * non-reference fields are declared as "long" even when ints or
 942      * shorts would suffice.  For class WorkQueue, an
 943      * embedded @Contended isolates the very busy top index, and
 944      * another segregates status and bookkeeping fields written
 945      * (mostly) by owners, that otherwise interfere with reading
 946      * array, top, and base fields. There are other variables commonly
 947      * contributing to false-sharing-related performance issues
 948      * (including fields of class Thread), but we can't do much about
 949      * this except try to minimize access.
 950      *
 951      * Initial sizing and resizing of WorkQueue arrays is an even more
 952      * delicate tradeoff because the best strategy systematically
 953      * varies across garbage collectors. Small arrays are better for
 954      * locality and reduce GC scan time, but large arrays reduce both
 955      * direct false-sharing and indirect cases due to GC bookkeeping
 956      * (cardmarks etc), and reduce the number of resizes, which are
 957      * not especially fast because they require atomic transfers.
 958      * Currently, arrays are initialized to be just large enough to
 959      * avoid resizing in most tree-structured tasks, but grow rapidly
 960      * until large.  (Maintenance note: any changes in fields, queues,
 961      * or their uses, or JVM layout policies, must be accompanied by
 962      * re-evaluation of these placement and sizing decisions.)


 963      *
 964      * Style notes
 965      * ===========
 966      *
 967      * Memory ordering relies mainly on atomic operations (CAS,
 968      * getAndSet, getAndAdd) along with moded accesses. These use
 969      * jdk-internal Unsafe for atomics and special memory modes,
 970      * rather than VarHandles, to avoid initialization dependencies in
 971      * other jdk components that require early parallelism.  This can
 972      * be awkward and ugly, but also reflects the need to control
 973      * outcomes across the unusual cases that arise in very racy code
 974      * with very few invariants. All atomic task slot updates use
 975      * Unsafe operations requiring offset positions, not indices, as
 976      * computed by method slotOffset. All fields are read into locals
 977      * before use, and null-checked if they are references, even if
 978      * they can never be null under current usages. Usually,
 979      * computations (held in local variables) are defined as soon as
 980      * logically enabled, sometimes to convince compilers that they
 981      * may be performed despite memory ordering constraints.  Array
 982      * accesses using masked indices include checks (that are always

1025      */
1026     static final long DEFAULT_KEEPALIVE = 60_000L;
1027 
1028     /**
1029      * Undershoot tolerance for idle timeouts, also serving as the
1030      * minimum allowed timeout value.
1031      */
1032     static final long TIMEOUT_SLOP = 20L;
1033 
1034     /**
1035      * The default value for common pool maxSpares.  Overridable using
1036      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1037      * system property.  The default value is far in excess of normal
1038      * requirements, but also far short of maximum capacity and typical OS
1039      * thread limits, so allows JVMs to catch misuse/abuse before
1040      * running out of resources needed to do so.
1041      */
1042     static final int DEFAULT_COMMON_MAX_SPARES = 256;
1043 
1044     /**
1045      * Initial capacity of work-stealing queue array.
1046      * Must be a power of two, at least 2. See above.
1047      */
1048     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1049 






1050     // conversions among short, int, long
1051     static final int  SMASK           = 0xffff;      // (unsigned) short bits
1052     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
1053     static final long UMASK           = ~LMASK;      // upper 32 bits
1054 
1055     // masks and sentinels for queue indices
1056     static final int MAX_CAP          = 0x7fff;   // max # workers
1057     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1058     static final int INVALID_ID       = 0x4000;   // unused external queue id
1059 
1060     // pool.runState bits
1061     static final long STOP            = 1L <<  0;   // terminating
1062     static final long SHUTDOWN        = 1L <<  1;   // terminate when quiescent
1063     static final long CLEANED         = 1L <<  2;   // stopped and queues cleared
1064     static final long TERMINATED      = 1L <<  3;   // only set if STOP also set
1065     static final long RS_LOCK         = 1L <<  4;   // lowest seqlock bit
1066 
1067     // spin/sleep limits for runState locking and elsewhere
1068     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1069     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos

1161     static final class DefaultForkJoinWorkerThreadFactory
1162         implements ForkJoinWorkerThreadFactory {
1163         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1164             return ((pool.workerNamePrefix == null) ? // is commonPool
1165                     new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1166                     new ForkJoinWorkerThread(null, pool, true, false));
1167         }
1168     }
1169 
1170     /**
1171      * Queues supporting work-stealing as well as external task
1172      * submission. See above for descriptions and algorithms.
1173      */
1174     static final class WorkQueue {
1175         // fields declared in order of their likely layout on most VMs
1176         final ForkJoinWorkerThread owner; // null if shared
1177         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
1178         int base;                  // index of next slot for poll
1179         final int config;          // mode bits
1180 
1181         @jdk.internal.vm.annotation.Contended("t") // segregate

1182         int top;                   // index of next slot for push
1183 
1184         // fields otherwise causing more unnecessary false-sharing cache misses
1185         @jdk.internal.vm.annotation.Contended("w")
1186         volatile int phase;        // versioned active status
1187         @jdk.internal.vm.annotation.Contended("w")
1188         int stackPred;             // pool stack (ctl) predecessor link
1189         @jdk.internal.vm.annotation.Contended("w")
1190         volatile int parking;      // nonzero if parked in awaitWork
1191         @jdk.internal.vm.annotation.Contended("w")
1192         volatile int source;       // source queue id (or DROPPED)
1193         @jdk.internal.vm.annotation.Contended("w")
1194         int nsteals;               // number of steals from other queues


1195 
1196         // Support for atomic operations
1197         private static final Unsafe U;
1198         private static final long PHASE;
1199         private static final long BASE;
1200         private static final long TOP;
1201         private static final long ARRAY;
1202 
1203         final void updateBase(int v) {
1204             U.putIntVolatile(this, BASE, v);
1205         }
1206         final void updateTop(int v) {
1207             U.putIntOpaque(this, TOP, v);
1208         }
1209         final void updateArray(ForkJoinTask<?>[] a) {
1210             U.getAndSetReference(this, ARRAY, a);
1211         }
1212         final void unlockPhase() {
1213             U.getAndAddInt(this, PHASE, IDLE);
1214         }
1215         final boolean tryLockPhase() {    // seqlock acquire
1216             int p;
1217             return (((p = phase) & IDLE) != 0 &&
1218                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1219         }
1220 
1221         /**
1222          * Constructor. For internal queues, most fields are initialized
1223          * upon thread start in pool.registerWorker.
1224          */
1225         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1226                   boolean clearThreadLocals) {




1227             this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1228             if ((this.owner = owner) == null) {
1229                 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1230                 phase = id | IDLE;
1231             }
1232         }
1233 
1234         /**
1235          * Returns an exportable index (used by ForkJoinWorkerThread).
1236          */
1237         final int getPoolIndex() {
1238             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1239         }
1240 
1241         /**
1242          * Returns the approximate number of tasks in the queue.
1243          */
1244         final int queueSize() {
1245             int unused = phase;             // for ordering effect
1246             return Math.max(top - base, 0); // ignore transient negative
1247         }
1248 
1249         /**
1250          * Pushes a task. Called only by owner or if already locked
1251          *
1252          * @param task the task; no-op if null
1253          * @param pool the pool to signal if was previously empty, else null
1254          * @param internal if caller owns this queue
1255          * @throws RejectedExecutionException if array could not be resized
1256          */
1257         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1258             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a, na;
1259             if ((a = array) != null && (cap = a.length) > 0) { // else disabled
1260                 int k = (m = cap - 1) & s;
1261                 if ((room = m - (s - b)) >= 0) {

1262                     top = s + 1;
1263                     long pos = slotOffset(k);
1264                     if (!internal)
1265                         U.putReference(a, pos, task);       // inside lock
1266                     else
1267                         U.getAndSetReference(a, pos, task); // fully fenced
1268                     if (room == 0 && (na = growArray(a, cap, s)) != null)
1269                         k = ((a = na).length - 1) & s;      // resize
1270                 }
1271                 if (!internal)
1272                     unlockPhase();
1273                 if (room < 0)
1274                     throw new RejectedExecutionException("Queue capacity exceeded");
1275                 if (pool != null &&
1276                     (room == 0 ||
1277                      U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null))
1278                     pool.signalWork(a, k);    // may have appeared empty
1279             }
1280         }
1281 
1282         /**
1283          * Resizes the queue array unless out of memory.
1284          * @param a old array
1285          * @param cap old array capacity
1286          * @param s current top
1287          * @return new array, or null on failure
1288          */
1289         private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
1290             int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
1291             ForkJoinTask<?>[] newArray = null;
1292             if (a != null && a.length == cap && cap > 0 && newCap > 0) {

1293                 try {
1294                     newArray = new ForkJoinTask<?>[newCap];
1295                 } catch (OutOfMemoryError ex) {
1296                 }
1297                 if (newArray != null) {               // else throw on next push
1298                     int mask = cap - 1, newMask = newCap - 1;
1299                     for (int k = s, j = cap; j > 0; --j, --k) {
1300                         ForkJoinTask<?> u;            // poll old, push to new
1301                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1302                                  a, slotOffset(k & mask), null)) == null)
1303                             break;                    // lost to pollers
1304                         newArray[k & newMask] = u;
1305                     }
1306                     updateArray(newArray);           // fully fenced
1307                 }
1308             }
1309             return newArray;
1310         }
1311 
1312         /**
1313          * Takes next task, if one exists, in lifo order.


1314          */
1315         private ForkJoinTask<?> localPop() {
1316             ForkJoinTask<?> t = null;
1317             int s = top - 1, cap; long k; ForkJoinTask<?>[] a;
1318             if ((a = array) != null && (cap = a.length) > 0 &&
1319                 U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
1320                 (t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
1321                 updateTop(s);
1322             return t;
1323         }
1324 
1325         /**
1326          * Takes next task, if one exists, in fifo order.
1327          */
1328         private ForkJoinTask<?> localPoll() {
1329             ForkJoinTask<?> t = null;
1330             int p = top, cap; ForkJoinTask<?>[] a;
1331             if ((a = array) != null && (cap = a.length) > 0) {
1332                 for (int b = base; p - b > 0; ) {
1333                     int nb = b + 1;
1334                     long k = slotOffset((cap - 1) & b);
1335                     if (U.getReference(a, k) == null) {
1336                         if (nb == p)
1337                             break;          // else base is lagging
1338                         while (b == (b = U.getIntAcquire(this, BASE)))
1339                             Thread.onSpinWait(); // spin to reduce memory traffic
1340                     }
1341                     else if ((t = (ForkJoinTask<?>)
1342                               U.getAndSetReference(a, k, null)) != null) {
1343                         updateBase(nb);
1344                         break;
1345                     }
1346                     else
1347                         b = base;


1348                 }
1349             }
1350             return t;
1351         }
1352 
1353         /**
1354          * Takes next task, if one exists, using configured mode.

1355          */
1356         final ForkJoinTask<?> nextLocalTask() {
1357             return (config & FIFO) == 0 ? localPop() : localPoll();
1358         }
1359 
1360         /**
1361          * Pops the given task only if it is at the current top.
1362          * @param task the task. Caller must ensure non-null.
1363          * @param internal if caller owns this queue
1364          */
1365         final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1366             boolean taken = false;
1367             ForkJoinTask<?>[] a = array;
1368             int p = top, s = p - 1, cap; long k;
1369             if (a != null && (cap = a.length) > 0 &&
1370                 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1371                 (internal || tryLockPhase())) {
1372                 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1373                     taken = true;
1374                     updateTop(s);
1375                 }
1376                 if (!internal)
1377                     unlockPhase();

1412                 Object u = U.getReference(         // next slot
1413                     a, slotOffset((cap - 1) & (nb = b + 1)));
1414                 if (base != b)                     // inconsistent
1415                     ;
1416                 else if (t == null) {
1417                     if (u == null && top - b <= 0)
1418                         break;                     // empty
1419                     if (pb == b)
1420                         Thread.onSpinWait();       // stalled
1421                 }
1422                 else if (U.compareAndSetReference(a, k, t, null)) {
1423                     updateBase(nb);
1424                     return t;
1425                 }
1426             }
1427             return null;
1428         }
1429 
1430         // specialized execution methods
1431 
1432         /*
1433          * Two version (lifo and fifo) of top-level execution, split
1434          * across modes to better isolate task dispatch and local
1435          * processing from top-level scheduling.
1436          */
1437         final void topLevelExecLifo(ForkJoinTask<?> task) {
1438             while (task != null) {
1439                 task.doExec();
1440                 task = localPop();
1441             }
1442         }
1443 
1444         final void topLevelExecFifo(ForkJoinTask<?> task) {
1445             while (task != null) {
1446                 task.doExec();
1447                 task = localPoll();
1448             }
1449         }
1450 
1451         /**
1452          * Deep form of tryUnpush: Traverses from top and removes and
1453          * runs task if present.
1454          */
1455         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1456             ForkJoinTask<?>[] a = array;
1457             int b = base, p = top, s = p - 1, d = p - b, cap;
1458             if (a != null && (cap = a.length) > 0) {
1459                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1460                     long k; boolean taken;
1461                     ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1462                         a, k = slotOffset(i & m));
1463                     if (t == null)
1464                         break;
1465                     if (t == task) {
1466                         if (!internal && !tryLockPhase())
1467                             break;                  // fail if locked

1557                 }
1558                 else if (!(t instanceof CompletableFuture
1559                            .AsynchronousCompletionTask))
1560                     break;
1561                 if (blocker != null && blocker.isReleasable())
1562                     break;
1563                 if (base == b && t != null &&
1564                     U.compareAndSetReference(a, k, t, null)) {
1565                     updateBase(b + 1);
1566                     t.doExec();
1567                 }
1568             }
1569         }
1570 
1571         // misc
1572 
1573         /**
1574          * Cancels all local tasks. Called only by owner.
1575          */
1576         final void cancelTasks() {
1577             for (ForkJoinTask<?> t; (t = localPop()) != null; ) {
1578                 try {
1579                     t.cancel(false);
1580                 } catch (Throwable ignore) {
1581                 }
1582             }
1583         }
1584 
1585         /**
1586          * Returns true if internal and not known to be blocked.
1587          */
1588         final boolean isApparentlyUnblocked() {
1589             Thread wt; Thread.State s;
1590             return ((wt = owner) != null && (phase & IDLE) != 0 &&
1591                     (s = wt.getState()) != Thread.State.BLOCKED &&
1592                     s != Thread.State.WAITING &&
1593                     s != Thread.State.TIMED_WAITING);
1594         }
1595 
1596         static {
1597             U = Unsafe.getUnsafe();

1759         return false;
1760     }
1761 
1762     /**
1763      * Provides a name for ForkJoinWorkerThread constructor.
1764      */
1765     final String nextWorkerThreadName() {
1766         String prefix = workerNamePrefix;
1767         long tid = incrementThreadIds() + 1L;
1768         if (prefix == null) // commonPool has no prefix
1769             prefix = "ForkJoinPool.commonPool-worker-";
1770         return prefix.concat(Long.toString(tid));
1771     }
1772 
1773     /**
1774      * Finishes initializing and records internal queue.
1775      *
1776      * @param w caller's WorkQueue
1777      */
1778     final void registerWorker(WorkQueue w) {
1779         if (w != null) {
1780             w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1781             ThreadLocalRandom.localInit();
1782             int seed = w.stackPred = ThreadLocalRandom.getProbe();
1783             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1784             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1785             long stop = lockRunState() & STOP;
1786             try {
1787                 WorkQueue[] qs; int n;
1788                 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1789                     for (int k = n, m = n - 1;  ; id += 2) {
1790                         if (qs[id &= m] == null)
1791                             break;
1792                         if ((k -= 2) <= 0) {
1793                             id |= n;
1794                             break;
1795                         }
1796                     }
1797                     w.phase = id | phaseSeq;    // now publishable
1798                     if (id < n)
1799                         qs[id] = w;
1800                     else {                      // expand

1838             do {} while (c != (c = compareAndExchangeCtl(
1839                                    c, ((RC_MASK & (c - RC_UNIT)) |
1840                                        (TC_MASK & (c - TC_UNIT)) |
1841                                        (LMASK & c)))));
1842         }
1843         if (phase != 0 && w != null) {     // remove index unless terminating
1844             long ns = w.nsteals & 0xffffffffL;
1845             if ((runState & STOP) == 0L) {
1846                 WorkQueue[] qs; int n, i;
1847                 if ((lockRunState() & STOP) == 0L &&
1848                     (qs = queues) != null && (n = qs.length) > 0 &&
1849                     qs[i = phase & SMASK & (n - 1)] == w) {
1850                     qs[i] = null;
1851                     stealCount += ns;      // accumulate steals
1852                 }
1853                 unlockRunState();
1854             }
1855         }
1856         if ((tryTerminate(false, false) & STOP) == 0L &&
1857             phase != 0 && w != null && w.source != DROPPED) {

1858             w.cancelTasks();               // clean queue
1859             signalWork(null, 0);           // possibly replace
1860         }
1861         if (ex != null)
1862             ForkJoinTask.rethrow(ex);
1863     }
1864 
1865     /**
1866      * Releases an idle worker, or creates one if not enough exist,
1867      * giving up if array a is nonnull and task at a[k] already taken.
1868      */
1869     final void signalWork(ForkJoinTask<?>[] a, int k) {
1870         int pc = parallelism;
1871         for (long c = ctl;;) {
1872             WorkQueue[] qs = queues;
1873             long ac = (c + RC_UNIT) & RC_MASK, nc;
1874             int sp = (int)c, i = sp & SMASK;
1875             if ((short)(c >>> RC_SHIFT) >= pc)
1876                 break;
1877             if (qs == null)
1878                 break;
1879             if (qs.length <= i)
1880                 break;
1881             WorkQueue w = qs[i], v = null;
1882             if (sp == 0) {
1883                 if ((short)(c >>> TC_SHIFT) >= pc)
1884                     break;
1885                 nc = ((c + TC_UNIT) & TC_MASK) | ac;
1886             }
1887             else if ((v = w) == null)
1888                 break;
1889             else
1890                 nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
1891             if (a != null && k < a.length && k >= 0 && a[k] == null)
1892                 break;
1893             if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
1894                 if (v == null)
1895                     createWorker();
1896                 else {
1897                     v.phase = sp;
1898                     if (v.parking != 0)
1899                         U.unpark(v.owner);
1900                 }
1901                 break;
1902             }
1903         }
1904     }
1905 
1906     /**
1907      * Releases all waiting workers. Called only during shutdown.
1908      */
1909     private void releaseWaiters() {
1910         for (long c = ctl;;) {
1911             WorkQueue[] qs; WorkQueue v; int sp, i;
1912             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1913                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1957                 else if ((e & SHUTDOWN) == 0)
1958                     return 0;
1959                 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1960                     return 0;
1961                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1962                     return 1;                             // enable termination
1963                 else
1964                     break;                                // restart
1965             }
1966         }
1967     }
1968 
1969     /**
1970      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1971      * See above for explanation.
1972      *
1973      * @param w caller's WorkQueue (may be null on failed initialization)
1974      */
1975     final void runWorker(WorkQueue w) {
1976         if (w != null) {
1977             int phase = w.phase, r = w.stackPred;         // seed from registerWorker
1978             int fifo = (int)config & FIFO;
1979             int nsteals = 0;                              // shadow w.nsteals
1980             boolean rescan = true;
1981             WorkQueue[] qs; int n;
1982             while ((rescan || (phase = deactivate(w, phase)) != 0) &&
1983                    (runState & STOP) == 0L && (qs = queues) != null &&
1984                    (n = qs.length) > 0) {
1985                 rescan = false;
1986                 int i = r, step = (r >>> 16) | 1;
1987                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1988                 scan: for (int j = -n; j < n; ++j, i += step) { // 2 passes
1989                     WorkQueue q; int qid;
1990                     if ((q = qs[qid = i & (n - 1)]) != null) {
1991                         for (;;) {                        // poll queue q
1992                             ForkJoinTask<?>[] a; int cap, b, m, nb, nk;
1993                             if ((a = q.array) == null || (cap = a.length) <= 0)
1994                                 break;
1995                             long bp = slotOffset((m = cap - 1) & (b = q.base));
1996                             long np = slotOffset(nk = m & (nb = b + 1));
1997                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1998                                 U.getReferenceAcquire(a, bp);
1999                             if (q.base != b || U.getReference(a, bp) != t)
2000                                 continue;                 // inconsistent
2001                             if (t == null) {
2002                                 if (rescan) {             // end of run
2003                                     w.nsteals = nsteals;
2004                                     break scan;
2005                                 }
2006                                 if (U.getReference(a, np) != null) {
2007                                     rescan = true;
2008                                     break scan;           // stalled; reorder scan
2009                                 }
2010                                 if (j >= 0 && q.top - b > 0) {
2011                                     rescan = true;
2012                                     break scan;           // size check on 2nd pass
2013                                 }
2014                                 break;                    // probably empty
2015                             }
2016                             if ((phase & IDLE) != 0)      // can't take yet
2017                                 phase = tryReactivate(w, phase);
2018                             else if (U.compareAndSetReference(a, bp, t, null)) {
2019                                 q.base = nb;
2020                                 Object nt = U.getReferenceAcquire(a, np);
2021                                 if (!rescan) {            // begin run
2022                                     rescan = true;
2023                                     w.source = qid;
2024                                 }
2025                                 ++nsteals;
2026                                 if (nt != null &&         // confirm a[nk]
2027                                     U.getReference(a, np) == nt)
2028                                     signalWork(a, nk);    // propagate
2029                                 if (fifo != 0)            // run t & its subtasks
2030                                     w.topLevelExecFifo(t);
2031                                 else
2032                                     w.topLevelExecLifo(t);
2033                             }
2034                         }
2035                     }
2036                 }





2037             }
2038         }
2039     }
2040 
2041     /**
2042      * If active, tries to deactivate worker, keeping active on contention,
2043      * else awaits signal or termination.
2044      *
2045      * @param w the work queue
2046      * @param phase w's currently known phase
2047      * @return current phase or 0 on exit
2048      */
2049     private int deactivate(WorkQueue w, int phase) {
2050         if ((phase & IDLE) == 0 && w != null) {
2051             int idlePhase = phase | IDLE;
2052             long pc = ctl, e;
2053             long qc = ((phase + (IDLE << 1)) & LMASK) | ((pc - RC_UNIT) & UMASK);
2054             w.stackPred = (int)pc;                // set ctl stack link
2055             w.phase = idlePhase;                  // try to enqueue
2056             if (!compareAndSetCtl(pc, qc))
2057                 w.phase = phase;                  // back out on contention
2058             else {
2059                 phase = idlePhase;
2060                 if ((qc & RC_MASK) <= 0L && ((e = runState) & SHUTDOWN) != 0L &&
2061                     (e & STOP) == 0L)
2062                     quiescent();                  // check quiescent termination
2063             }














2064         }
2065         else
2066             phase = awaitWork(w, phase);
2067         return phase;
2068     }
2069 
2070     /**
2071      * Reactivates worker w if it is currently top of ctl stack
2072      *
2073      * @param w the work queue
2074      * @param phase w's currently known (idle) phase
2075      * @return currently known phase on exit
2076      */
2077     private int tryReactivate(WorkQueue w, int phase) {
2078         int activePhase = phase + IDLE; long c;
2079         if (w != null && (phase = w.phase) != activePhase &&
2080             (int)(c = ctl) == activePhase &&
2081             compareAndSetCtl(c, (w.stackPred & LMASK) | ((c + RC_UNIT) & UMASK)))
2082             phase = w.phase = activePhase;
2083         return phase;
2084     }
2085 
2086     /**
2087      * Awaits signal or termination.
2088      *
2089      * @param w the work queue
2090      * @param phase w's currently known (idle) phase
2091      * @return current phase or 0 on exit
2092      */
2093     private int awaitWork(WorkQueue w, int phase) {
2094         int idle = 1, activePhase = phase + IDLE;
2095         if ((runState & STOP) == 0L && w != null &&
2096             (idle = w.phase - activePhase) != 0) {
2097             WorkQueue[] qs;
2098             int cfg = w.config;
2099             long waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
2100             int n = ((qs = queues) == null) ? 0 : qs.length;
2101             int spins = Math.max((n << 1) | (n - 1), SPIN_WAITS);
2102             long deadline = waitTime + System.currentTimeMillis();
2103             if ((cfg & CLEAR_TLS) != 0 &&     // instanceof check always true
2104                 Thread.currentThread() instanceof ForkJoinWorkerThread f)
2105                 f.resetThreadLocals();        // clear while accessing thread state
2106             LockSupport.setCurrentBlocker(this);
2107             for (;;) {
2108                 Thread.interrupted();         // clear status
2109                 int s = spins;
2110                 while ((idle = w.phase - activePhase) != 0 && --s != 0)
2111                     Thread.onSpinWait();      // spin before/between parks
2112                 if (idle == 0)
2113                     break;
2114                 if ((runState & STOP) != 0L)
2115                     break;
2116                 boolean trimmable = false;    // use timed wait if trimmable
2117                 long d = 0L, c;
2118                 if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) {
2119                     if (deadline - System.currentTimeMillis() <= TIMEOUT_SLOP) {
2120                         if (tryTrim(w, c, activePhase))
2121                             break;
2122                         continue;             // lost race to trim



2123                     }
2124                     d = deadline;
2125                     trimmable = true;
2126                 }
2127                 w.parking = 1;                // enable unpark and recheck
2128                 if ((idle = w.phase - activePhase) != 0)
2129                     U.park(trimmable, d);
2130                 w.parking = 0;                // close unpark window
2131                 if (idle == 0 || (idle = w.phase - activePhase) == 0)
2132                     break;
2133             }
2134             LockSupport.setCurrentBlocker(null);
2135         }
2136         return (idle == 0) ? activePhase : 0;
2137     }
2138 
2139     /**
2140      * Tries to remove and deregister worker after timeout, and release
2141      * another to do the same unless new tasks are found.

2142      */
2143     private boolean tryTrim(WorkQueue w, long c, int activePhase) {
2144         if (w != null) {
2145             int vp, i; WorkQueue[] vs; WorkQueue v;
2146             long nc = ((w.stackPred & LMASK) |
2147                        ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
2148             if (compareAndSetCtl(c, nc)) {
2149                 w.source = DROPPED;
2150                 w.phase = activePhase;
2151                 if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2152                     vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2153                     compareAndSetCtl(           // try to wake up next waiter
2154                         nc, ((v.stackPred & LMASK) |
2155                              ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
2156                     v.source = INVALID_ID;      // enable cascaded timeouts
2157                     v.phase = vp;
2158                     U.unpark(v.owner);
2159                 }
2160                 return true;




2161             }
2162         }
2163         return false;
2164     }
2165 
2166     /**
2167      * Scans for and returns a polled task, if available.  Used only
2168      * for untracked polls. Begins scan at a random index to avoid
2169      * systematic unfairness.
2170      *
2171      * @param submissionsOnly if true, only scan submission queues
2172      */
2173     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
2174         if ((runState & STOP) == 0L) {
2175             WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
2176             int r = ThreadLocalRandom.nextSecondarySeed();
2177             if (submissionsOnly)                 // even indices only
2178                 r &= ~1;
2179             int step = (submissionsOnly) ? 2 : 1;
2180             if ((qs = queues) != null && (n = qs.length) > 0) {
2181                 for (int i = n; i > 0; i -= step, r += step) {
2182                     if ((q = qs[r & (n - 1)]) != null &&
2183                         (t = q.poll()) != null)

2560         else
2561             return 0;
2562     }
2563 
2564     /**
2565      * Gets and removes a local or stolen task for the given worker.
2566      *
2567      * @return a task, if available
2568      */
2569     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2570         ForkJoinTask<?> t;
2571         if (w == null || (t = w.nextLocalTask()) == null)
2572             t = pollScan(false);
2573         return t;
2574     }
2575 
2576     // External operations
2577 
2578     /**
2579      * Finds and locks a WorkQueue for an external submitter, or
2580      * throws RejectedExecutionException if shutdown

2581      * @param rejectOnShutdown true if RejectedExecutionException
2582      *        should be thrown when shutdown
2583      */
2584     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2585         int r;
2586         if ((r = ThreadLocalRandom.getProbe()) == 0) {
2587             ThreadLocalRandom.localInit();   // initialize caller's probe
2588             r = ThreadLocalRandom.getProbe();
2589         }
2590         for (;;) {
2591             WorkQueue q; WorkQueue[] qs; int n, id, i;
2592             if ((qs = queues) == null || (n = qs.length) <= 0)


2593                 break;
2594             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2595                 WorkQueue newq = new WorkQueue(null, id, 0, false);
2596                 lockRunState();
2597                 if (qs[i] == null && queues == qs)
2598                     q = qs[i] = newq;         // else lost race to install


2599                 unlockRunState();





2600             }
2601             if (q != null && q.tryLockPhase()) {
2602                 if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2603                     q.unlockPhase();          // check while q lock held
2604                     break;
2605                 }









2606                 return q;
2607             }
2608             r = ThreadLocalRandom.advanceProbe(r); // move
2609         }
2610         throw new RejectedExecutionException();
2611     }
2612 
2613     private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2614         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2615         if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2616             (wt = (ForkJoinWorkerThread)t).pool == this) {
2617             internal = true;
2618             q = wt.workQueue;
2619         }
2620         else {                     // find and lock queue
2621             internal = false;
2622             q = externalSubmissionQueue(true);
2623         }
2624         q.push(task, signalIfEmpty ? this : null, internal);
2625         return task;
2626     }
2627 












2628     /**
2629      * Returns queue for an external thread, if one exists that has
2630      * possibly ever submitted to the given pool (nonzero probe), or
2631      * null if none.
2632      */
2633     static WorkQueue externalQueue(ForkJoinPool p) {
2634         WorkQueue[] qs; int n;
2635         int r = ThreadLocalRandom.getProbe();
2636         return (p != null && (qs = p.queues) != null &&
2637                 (n = qs.length) > 0 && r != 0) ?
2638             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2639     }
2640 
2641     /**
2642      * Returns external queue for common pool.
2643      */
2644     static WorkQueue commonQueue() {
2645         return externalQueue(common);
2646     }
2647 
< prev index next >