< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

 543      * WorkQueue field "phase" encodes the queue array id in lower
 544      * bits, and otherwise acts similarly to the pool runState field:
 545      * The "IDLE" bit is clear while active (either a released worker
 546      * or a locked external queue), with other bits serving as a
 547      * version counter to distinguish changes across multiple reads.
 548      * Note that phase field updates lag queue CAS releases; seeing a
 549      * non-idle phase does not guarantee that the worker is available
 550      * (and so is never checked in this way).
 551      *
 552      * The ctl field also serves as the basis for memory
 553      * synchronization surrounding activation. This uses a more
 554      * efficient version of a Dekker-like rule that task producers and
 555      * consumers sync with each other by both writing/CASing ctl (even
 556      * if to its current value).  However, rather than CASing ctl to
 557      * its current value in the common case where no action is
 558      * required, we reduce write contention by ensuring that
 559      * signalWork invocations are prefaced with a fully fenced memory
 560      * access (which is usually needed anyway).
 561      *
 562      * Signalling. Signals (in signalWork) cause new or reactivated
 563      * workers to scan for tasks.  Method signalWork and its callers
 564      * try to approximate the unattainable goal of having the right
 565      * number of workers activated for the tasks at hand, but must err
 566      * on the side of too many workers vs too few to avoid stalls:
 567      *
 568      *  * If computations are purely tree structured, it suffices for
 569      *    every worker to activate another when it pushes a task into
 570      *    an empty queue, resulting in O(log(#threads)) steps to full
 571      *    activation. Emptiness must be conservatively approximated,
 572      *    which may result in unnecessary signals.  Also, to reduce
 573      *    resource usages in some cases, at the expense of slower
 574      *    startup in others, activation of an idle thread is preferred
 575      *    over creating a new one, here and elsewhere.
 576      *
 577      *  * At the other extreme, if "flat" tasks (those that do not in
 578      *    turn generate others) come in serially from only a single
 579      *    producer, each worker taking a task from a queue should
 580      *    propagate a signal if there are more tasks in that
 581      *    queue. This is equivalent to, but generally faster than,
 582      *    arranging the stealer take multiple tasks, re-pushing one or
 583      *    more on its own queue, and signalling (because its queue is
 584      *    empty), also resulting in logarithmic full activation
 585      *    time. If tasks do not not engage in unbounded loops based on
 586      *    the actions of other workers with unknown dependencies loop,
 587      *    this form of proagation can be limited to one signal per
 588      *    activation (phase change). We distinguish the cases by
 589      *    further signalling only if the task is an InterruptibleTask
 590      *    (see below), which are the only supported forms of task that
 591      *    may do so.
 592      *
 593      * * Because we don't know about usage patterns (or most commonly,
 594      *    mixtures), we use both approaches, which present even more
 595      *    opportunities to over-signal. (Failure to distinguish these
 596      *    cases in terms of submission methods was arguably an early
 597      *    design mistake.)  Note that in either of these contexts,
 598      *    signals may be (and often are) unnecessary because active
 599      *    workers continue scanning after running tasks without the
 600      *    need to be signalled (which is one reason work stealing is
 601      *    often faster than alternatives), so additional workers
 602      *    aren't needed.
 603      *
 604      * * For rapidly branching tasks that require full pool resources,
 605      *   oversignalling is OK, because signalWork will soon have no
 606      *   more workers to create or reactivate. But for others (mainly
 607      *   externally submitted tasks), overprovisioning may cause very
 608      *   noticeable slowdowns due to contention and resource
 609      *   wastage. We reduce impact by deactivating workers when
 610      *   queues don't have accessible tasks, but reactivating and
 611      *   rescanning if other tasks remain.
 612      *
 613      * * Despite these, signal contention and overhead effects still
 614      *   occur during ramp-up and ramp-down of small computations.
 615      *
 616      * Scanning. Method runWorker performs top-level scanning for (and
 617      * execution of) tasks by polling a pseudo-random permutation of
 618      * the array (by starting at a given index, and using a constant
 619      * cyclically exhaustive stride.)  It uses the same basic polling
 620      * method as WorkQueue.poll(), but restarts with a different
 621      * permutation on each invocation.  The pseudorandom generator
 622      * need not have high-quality statistical properties in the long
 623      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 624      * from ThreadLocalRandom probes, which are cheap and
 625      * suffice. Each queue's polling attempts to avoid becoming stuck
 626      * when other scanners/pollers stall.  Scans do not otherwise
 627      * explicitly take into account core affinities, loads, cache
 628      * localities, etc, However, they do exploit temporal locality
 629      * (which usually approximates these) by preferring to re-poll
 630      * from the same queue after a successful poll before trying
 631      * others, which also reduces bookkeeping, cache traffic, and
 632      * scanning overhead. But it also reduces fairness, which is
 633      * partially counteracted by giving up on detected interference
 634      * (which also reduces contention when too many workers try to
 635      * take small tasks from the same queue).
 636      *
 637      * Deactivation. When no tasks are found by a worker in runWorker,
 638      * it tries to deactivate()), giving up (and rescanning) on "ctl"
 639      * contention. To avoid missed signals during deactivation, the
 640      * method rescans and reactivates if there may have been a missed
 641      * signal during deactivation. To reduce false-alarm reactivations
 642      * while doing so, we scan multiple times (analogously to method
 643      * quiescent()) before trying to reactivate.  Because idle workers
 644      * are often not yet blocked (parked), we use a WorkQueue field to
 645      * advertise that a waiter actually needs unparking upon signal.



























 646      *
 647      * Quiescence. Workers scan looking for work, giving up when they
 648      * don't find any, without being sure that none are available.
 649      * However, some required functionality relies on consensus about
 650      * quiescence (also termination, discussed below). The count
 651      * fields in ctl allow accurate discovery of states in which all
 652      * workers are idle.  However, because external (asynchronous)
 653      * submitters are not part of this vote, these mechanisms
 654      * themselves do not guarantee that the pool is in a quiescent
 655      * state with respect to methods isQuiescent, shutdown (which
 656      * begins termination when quiescent), helpQuiesce, and indirectly
 657      * others including tryCompensate. Method quiescent() is used in
 658      * all of these contexts. It provides checks that all workers are
 659      * idle and there are no submissions that they could poll if they
 660      * were not idle, retrying on inconsistent reads of queues and
 661      * using the runState seqLock to retry on queue array updates.
 662      * (It also reports quiescence if the pool is terminating.) A true
 663      * report means only that there was a moment at which quiescence
 664      * held.  False negatives are inevitable (for example when queues
 665      * indices lag updates, as described above), which is accommodated

 875      * ====================
 876      *
 877      * Regular ForkJoinTasks manage task cancellation (method cancel)
 878      * independently from the interrupt status of threads running
 879      * tasks.  Interrupts are issued internally only while
 880      * terminating, to wake up workers and cancel queued tasks.  By
 881      * default, interrupts are cleared only when necessary to ensure
 882      * that calls to LockSupport.park do not loop indefinitely (park
 883      * returns immediately if the current thread is interrupted).
 884      *
 885      * To comply with ExecutorService specs, we use subclasses of
 886      * abstract class InterruptibleTask for tasks that require
 887      * stronger interruption and cancellation guarantees.  External
 888      * submitters never run these tasks, even if in the common pool
 889      * (as indicated by ForkJoinTask.noUserHelp status bit).
 890      * InterruptibleTasks include a "runner" field (implemented
 891      * similarly to FutureTask) to support cancel(true).  Upon pool
 892      * shutdown, runners are interrupted so they can cancel. Since
 893      * external joining callers never run these tasks, they must await
 894      * cancellation by others, which can occur along several different
 895      * paths. The inability to rely on caller-runs may also require
 896      * extra signalling (resulting in scanning and contention) so is
 897      * done only conditionally in methods push and runworker.
 898      *
 899      * Across these APIs, rules for reporting exceptions for tasks
 900      * with results accessed via join() differ from those via get(),
 901      * which differ from those invoked using pool submit methods by
 902      * non-workers (which comply with Future.get() specs). Internal
 903      * usages of ForkJoinTasks ignore interrupt status when executing
 904      * or awaiting completion.  Otherwise, reporting task results or
 905      * exceptions is preferred to throwing InterruptedExceptions,
 906      * which are in turn preferred to timeouts. Similarly, completion
 907      * status is preferred to reporting cancellation.  Cancellation is
 908      * reported as an unchecked exception by join(), and by worker
 909      * calls to get(), but is otherwise wrapped in a (checked)
 910      * ExecutionException.
 911      *
 912      * Worker Threads cannot be VirtualThreads, as enforced by
 913      * requiring ForkJoinWorkerThreads in factories.  There are
 914      * several constructions relying on this.  However as of this
 915      * writing, virtual thread bodies are by default run as some form
 916      * of InterruptibleTask.
 917      *

 944      * the placement of their fields. Caches misses and contention due
 945      * to false-sharing have been observed to slow down some programs
 946      * by more than a factor of four. Effects may vary across initial
 947      * memory configuarations, applications, and different garbage
 948      * collectors and GC settings, so there is no perfect solution.
 949      * Too much isolation may generate more cache misses in common
 950      * cases (because some fields snd slots are usually read at the
 951      * same time). The @Contended annotation provides only rough
 952      * control (for good reason). Similarly for relying on fields
 953      * being placed in size-sorted declaration order.
 954      *
 955      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 956      * most false-sharing misses with respect to other fields. Also,
 957      * ForkJoinPool fields are ordered such that fields less prone to
 958      * contention effects are first, offsetting those that otherwise
 959      * would be, while also reducing total footprint vs using
 960      * multiple @Contended regions, which tends to slow down
 961      * less-contended applications. To help arrange this, some
 962      * non-reference fields are declared as "long" even when ints or
 963      * shorts would suffice.  For class WorkQueue, an
 964      * embedded @Contended region segregates fields most heavily
 965      * updated by owners from those most commonly read by stealers or
 966      * other management.




 967      *
 968      * Initial sizing and resizing of WorkQueue arrays is an even more
 969      * delicate tradeoff because the best strategy systematically
 970      * varies across garbage collectors. Small arrays are better for
 971      * locality and reduce GC scan time, but large arrays reduce both
 972      * direct false-sharing and indirect cases due to GC bookkeeping
 973      * (cardmarks etc), and reduce the number of resizes, which are
 974      * not especially fast because they require atomic transfers.
 975      * Currently, arrays for workers are initialized to be just large
 976      * enough to avoid resizing in most tree-structured tasks, but
 977      * larger for external queues where both false-sharing problems
 978      * and the need for resizing are more common. (Maintenance note:
 979      * any changes in fields, queues, or their uses, or JVM layout
 980      * policies, must be accompanied by re-evaluation of these
 981      * placement and sizing decisions.)
 982      *
 983      * Style notes
 984      * ===========
 985      *
 986      * Memory ordering relies mainly on atomic operations (CAS,
 987      * getAndSet, getAndAdd) along with moded accesses. These use
 988      * jdk-internal Unsafe for atomics and special memory modes,
 989      * rather than VarHandles, to avoid initialization dependencies in
 990      * other jdk components that require early parallelism.  This can
 991      * be awkward and ugly, but also reflects the need to control
 992      * outcomes across the unusual cases that arise in very racy code
 993      * with very few invariants. All atomic task slot updates use
 994      * Unsafe operations requiring offset positions, not indices, as
 995      * computed by method slotOffset. All fields are read into locals
 996      * before use, and null-checked if they are references, even if
 997      * they can never be null under current usages. Usually,
 998      * computations (held in local variables) are defined as soon as
 999      * logically enabled, sometimes to convince compilers that they
1000      * may be performed despite memory ordering constraints.  Array
1001      * accesses using masked indices include checks (that are always

1044      */
1045     static final long DEFAULT_KEEPALIVE = 60_000L;
1046 
1047     /**
1048      * Undershoot tolerance for idle timeouts, also serving as the
1049      * minimum allowed timeout value.
1050      */
1051     static final long TIMEOUT_SLOP = 20L;
1052 
1053     /**
1054      * The default value for common pool maxSpares.  Overridable using
1055      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1056      * system property.  The default value is far in excess of normal
1057      * requirements, but also far short of maximum capacity and typical OS
1058      * thread limits, so allows JVMs to catch misuse/abuse before
1059      * running out of resources needed to do so.
1060      */
1061     static final int DEFAULT_COMMON_MAX_SPARES = 256;
1062 
1063     /**
1064      * Initial capacity of work-stealing queue array for workers.
1065      * Must be a power of two, at least 2. See above.
1066      */
1067     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1068 
1069     /**
1070      * Initial capacity of work-stealing queue array for external queues.
1071      * Must be a power of two, at least 2. See above.
1072      */
1073     static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
1074 
1075     // conversions among short, int, long
1076     static final int  SMASK           = 0xffff;      // (unsigned) short bits
1077     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
1078     static final long UMASK           = ~LMASK;      // upper 32 bits
1079 
1080     // masks and sentinels for queue indices
1081     static final int MAX_CAP          = 0x7fff;   // max # workers
1082     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1083     static final int INVALID_ID       = 0x4000;   // unused external queue id
1084 
1085     // pool.runState bits
1086     static final long STOP            = 1L <<  0;   // terminating
1087     static final long SHUTDOWN        = 1L <<  1;   // terminate when quiescent
1088     static final long CLEANED         = 1L <<  2;   // stopped and queues cleared
1089     static final long TERMINATED      = 1L <<  3;   // only set if STOP also set
1090     static final long RS_LOCK         = 1L <<  4;   // lowest seqlock bit
1091 
1092     // spin/sleep limits for runState locking and elsewhere
1093     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1094     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos

1186     static final class DefaultForkJoinWorkerThreadFactory
1187         implements ForkJoinWorkerThreadFactory {
1188         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1189             return ((pool.workerNamePrefix == null) ? // is commonPool
1190                     new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1191                     new ForkJoinWorkerThread(null, pool, true, false));
1192         }
1193     }
1194 
1195     /**
1196      * Queues supporting work-stealing as well as external task
1197      * submission. See above for descriptions and algorithms.
1198      */
1199     static final class WorkQueue {
1200         // fields declared in order of their likely layout on most VMs
1201         final ForkJoinWorkerThread owner; // null if shared
1202         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
1203         int base;                  // index of next slot for poll
1204         final int config;          // mode bits
1205 
1206         // fields otherwise causing more unnecessary false-sharing cache misses
1207         @jdk.internal.vm.annotation.Contended("w")
1208         int top;                   // index of next slot for push


1209         @jdk.internal.vm.annotation.Contended("w")
1210         volatile int phase;        // versioned active status
1211         @jdk.internal.vm.annotation.Contended("w")
1212         int stackPred;             // pool stack (ctl) predecessor link
1213         @jdk.internal.vm.annotation.Contended("w")


1214         volatile int source;       // source queue id (or DROPPED)
1215         @jdk.internal.vm.annotation.Contended("w")
1216         int nsteals;               // number of steals from other queues
1217         @jdk.internal.vm.annotation.Contended("w")
1218         volatile int parking;      // nonzero if parked in awaitWork
1219 
1220         // Support for atomic operations
1221         private static final Unsafe U;
1222         private static final long PHASE;
1223         private static final long BASE;
1224         private static final long TOP;
1225         private static final long ARRAY;
1226 
1227         final void updateBase(int v) {
1228             U.putIntVolatile(this, BASE, v);
1229         }
1230         final void updateTop(int v) {
1231             U.putIntOpaque(this, TOP, v);
1232         }
1233         final void updateArray(ForkJoinTask<?>[] a) {
1234             U.getAndSetReference(this, ARRAY, a);
1235         }
1236         final void unlockPhase() {
1237             U.getAndAddInt(this, PHASE, IDLE);
1238         }
1239         final boolean tryLockPhase() {    // seqlock acquire
1240             int p;
1241             return (((p = phase) & IDLE) != 0 &&
1242                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1243         }
1244 
1245         /**
1246          * Constructor. For internal queues, most fields are initialized
1247          * upon thread start in pool.registerWorker.
1248          */
1249         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1250                   boolean clearThreadLocals) {
1251             array = new ForkJoinTask<?>[owner == null ?
1252                                         INITIAL_EXTERNAL_QUEUE_CAPACITY :
1253                                         INITIAL_QUEUE_CAPACITY];
1254             this.owner = owner;
1255             this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;




1256         }
1257 
1258         /**
1259          * Returns an exportable index (used by ForkJoinWorkerThread).
1260          */
1261         final int getPoolIndex() {
1262             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1263         }
1264 
1265         /**
1266          * Returns the approximate number of tasks in the queue.
1267          */
1268         final int queueSize() {
1269             int unused = phase;             // for ordering effect
1270             return Math.max(top - base, 0); // ignore transient negative
1271         }
1272 
1273         /**
1274          * Pushes a task. Called only by owner or if already locked
1275          *
1276          * @param task the task; no-op if null
1277          * @param pool the pool to signal if was previously empty, else null
1278          * @param internal if caller owns this queue
1279          * @throws RejectedExecutionException if array could not be resized
1280          */
1281         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1282             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1283             if ((a = array) != null && (cap = a.length) > 0 && // else disabled
1284                 task != null) {
1285                 int pk = task.noUserHelp() + 1;             // prev slot offset
1286                 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1287                     top = s + 1;
1288                     long pos = slotOffset(m & s);
1289                     if (!internal)
1290                         U.putReference(a, pos, task);       // inside lock
1291                     else
1292                         U.getAndSetReference(a, pos, task); // fully fenced
1293                     if (room == 0)                          // resize
1294                         growArray(a, cap, s);
1295                 }
1296                 if (!internal)
1297                     unlockPhase();
1298                 if (room < 0)
1299                     throw new RejectedExecutionException("Queue capacity exceeded");
1300                 if ((room == 0 || a[m & (s - pk)] == null) &&
1301                     pool != null)
1302                     pool.signalWork();   // may have appeared empty
1303             }
1304         }
1305 
1306         /**
1307          * Resizes the queue array unless out of memory.
1308          * @param a old array
1309          * @param cap old array capacity
1310          * @param s current top

1311          */
1312         private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
1313             int newCap = cap << 1;

1314             if (a != null && a.length == cap && cap > 0 && newCap > 0) {
1315                 ForkJoinTask<?>[] newArray = null;
1316                 try {
1317                     newArray = new ForkJoinTask<?>[newCap];
1318                 } catch (OutOfMemoryError ex) {
1319                 }
1320                 if (newArray != null) {               // else throw on next push
1321                     int mask = cap - 1, newMask = newCap - 1;
1322                     for (int k = s, j = cap; j > 0; --j, --k) {
1323                         ForkJoinTask<?> u;            // poll old, push to new
1324                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1325                                  a, slotOffset(k & mask), null)) == null)
1326                             break;                    // lost to pollers
1327                         newArray[k & newMask] = u;
1328                     }
1329                     updateArray(newArray);           // fully fenced
1330                 }
1331             }

1332         }
1333 
1334         /**
1335          * Takes next task, if one exists, in order specified by mode,
1336          * so acts as either local-pop or local-poll. Called only by owner.
1337          * @param fifo nonzero if FIFO mode
1338          */
1339         private ForkJoinTask<?> nextLocalTask(int fifo) {
1340             ForkJoinTask<?> t = null;
1341             ForkJoinTask<?>[] a = array;
1342             int b = base, p = top, cap;
1343             if (p - b > 0 && a != null && (cap = a.length) > 0) {
1344                 for (int m = cap - 1, s, nb;;) {
1345                     if (fifo == 0 || (nb = b + 1) == p) {
1346                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1347                                  a, slotOffset(m & (s = p - 1)), null)) != null)
1348                             updateTop(s);       // else lost race for only task
1349                         break;














1350                     }
1351                     if ((t = (ForkJoinTask<?>)U.getAndSetReference(
1352                              a, slotOffset(m & b), null)) != null) {
1353                         updateBase(nb);
1354                         break;
1355                     }
1356                     while (b == (b = U.getIntAcquire(this, BASE)))
1357                         Thread.onSpinWait();    // spin to reduce memory traffic
1358                     if (p - b <= 0)
1359                         break;
1360                 }
1361             }
1362             return t;
1363         }
1364 
1365         /**
1366          * Takes next task, if one exists, using configured mode.
1367          * (Always internal, never called for Common pool.)
1368          */
1369         final ForkJoinTask<?> nextLocalTask() {
1370             return nextLocalTask(config & FIFO);
1371         }
1372 
1373         /**
1374          * Pops the given task only if it is at the current top.
1375          * @param task the task. Caller must ensure non-null.
1376          * @param internal if caller owns this queue
1377          */
1378         final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1379             boolean taken = false;
1380             ForkJoinTask<?>[] a = array;
1381             int p = top, s = p - 1, cap; long k;
1382             if (a != null && (cap = a.length) > 0 &&
1383                 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1384                 (internal || tryLockPhase())) {
1385                 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1386                     taken = true;
1387                     updateTop(s);
1388                 }
1389                 if (!internal)
1390                     unlockPhase();

1431                         break;                     // empty
1432                     if (pb == b)
1433                         Thread.onSpinWait();       // stalled
1434                 }
1435                 else if (U.compareAndSetReference(a, k, t, null)) {
1436                     updateBase(nb);
1437                     return t;
1438                 }
1439             }
1440             return null;
1441         }
1442 
1443         // specialized execution methods
1444 
1445         /**
1446          * Runs the given task, as well as remaining local tasks.
1447          */
1448         final void topLevelExec(ForkJoinTask<?> task, int fifo) {
1449             while (task != null) {
1450                 task.doExec();
1451                 task = nextLocalTask(fifo);
1452             }
1453         }
1454 
1455         /**
1456          * Deep form of tryUnpush: Traverses from top and removes and
1457          * runs task if present.
1458          */
1459         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1460             ForkJoinTask<?>[] a = array;
1461             int b = base, p = top, s = p - 1, d = p - b, cap;
1462             if (a != null && (cap = a.length) > 0) {
1463                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1464                     long k; boolean taken;
1465                     ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1466                         a, k = slotOffset(i & m));
1467                     if (t == null)
1468                         break;
1469                     if (t == task) {
1470                         if (!internal && !tryLockPhase())
1471                             break;                  // fail if locked

1561                 }
1562                 else if (!(t instanceof CompletableFuture
1563                            .AsynchronousCompletionTask))
1564                     break;
1565                 if (blocker != null && blocker.isReleasable())
1566                     break;
1567                 if (base == b && t != null &&
1568                     U.compareAndSetReference(a, k, t, null)) {
1569                     updateBase(b + 1);
1570                     t.doExec();
1571                 }
1572             }
1573         }
1574 
1575         // misc
1576 
1577         /**
1578          * Cancels all local tasks. Called only by owner.
1579          */
1580         final void cancelTasks() {
1581             for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) {
1582                 try {
1583                     t.cancel(false);
1584                 } catch (Throwable ignore) {
1585                 }
1586             }
1587         }
1588 
1589         /**
1590          * Returns true if internal and not known to be blocked.
1591          */
1592         final boolean isApparentlyUnblocked() {
1593             Thread wt; Thread.State s;
1594             return ((wt = owner) != null && (phase & IDLE) != 0 &&
1595                     (s = wt.getState()) != Thread.State.BLOCKED &&
1596                     s != Thread.State.WAITING &&
1597                     s != Thread.State.TIMED_WAITING);
1598         }
1599 
1600         static {
1601             U = Unsafe.getUnsafe();

1763         return false;
1764     }
1765 
1766     /**
1767      * Provides a name for ForkJoinWorkerThread constructor.
1768      */
1769     final String nextWorkerThreadName() {
1770         String prefix = workerNamePrefix;
1771         long tid = incrementThreadIds() + 1L;
1772         if (prefix == null) // commonPool has no prefix
1773             prefix = "ForkJoinPool.commonPool-worker-";
1774         return prefix.concat(Long.toString(tid));
1775     }
1776 
1777     /**
1778      * Finishes initializing and records internal queue.
1779      *
1780      * @param w caller's WorkQueue
1781      */
1782     final void registerWorker(WorkQueue w) {
1783         if (w != null && (runState & STOP) == 0L) {

1784             ThreadLocalRandom.localInit();
1785             int seed = w.stackPred = ThreadLocalRandom.getProbe();
1786             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1787             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1788             long stop = lockRunState() & STOP;
1789             try {
1790                 WorkQueue[] qs; int n;
1791                 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1792                     for (int k = n, m = n - 1;  ; id += 2) {
1793                         if (qs[id &= m] == null)
1794                             break;
1795                         if ((k -= 2) <= 0) {
1796                             id |= n;
1797                             break;
1798                         }
1799                     }
1800                     w.phase = id | phaseSeq;    // now publishable
1801                     if (id < n)
1802                         qs[id] = w;
1803                     else {                      // expand

1841             do {} while (c != (c = compareAndExchangeCtl(
1842                                    c, ((RC_MASK & (c - RC_UNIT)) |
1843                                        (TC_MASK & (c - TC_UNIT)) |
1844                                        (LMASK & c)))));
1845         }
1846         if (phase != 0 && w != null) {     // remove index unless terminating
1847             long ns = w.nsteals & 0xffffffffL;
1848             if ((runState & STOP) == 0L) {
1849                 WorkQueue[] qs; int n, i;
1850                 if ((lockRunState() & STOP) == 0L &&
1851                     (qs = queues) != null && (n = qs.length) > 0 &&
1852                     qs[i = phase & SMASK & (n - 1)] == w) {
1853                     qs[i] = null;
1854                     stealCount += ns;      // accumulate steals
1855                 }
1856                 unlockRunState();
1857             }
1858         }
1859         if ((tryTerminate(false, false) & STOP) == 0L &&
1860             phase != 0 && w != null && w.source != DROPPED) {
1861             signalWork();                  // possibly replace
1862             w.cancelTasks();               // clean queue

1863         }
1864         if (ex != null)
1865             ForkJoinTask.rethrow(ex);
1866     }
1867 
1868     /**
1869      * Releases an idle worker, or creates one if not enough exist.

1870      */
1871     final void signalWork() {
1872         int pc = parallelism;
1873         for (long c = ctl;;) {
1874             WorkQueue[] qs = queues;
1875             long ac = (c + RC_UNIT) & RC_MASK, nc;
1876             int sp = (int)c, i = sp & SMASK;
1877             if ((short)(c >>> RC_SHIFT) >= pc)
1878                 break;
1879             if (qs == null)
1880                 break;
1881             if (qs.length <= i)
1882                 break;
1883             WorkQueue w = qs[i], v = null;
1884             if (sp == 0) {
1885                 if ((short)(c >>> TC_SHIFT) >= pc)
1886                     break;
1887                 nc = ((c + TC_UNIT) & TC_MASK);
1888             }
1889             else if ((v = w) == null)
1890                 break;
1891             else
1892                 nc = (v.stackPred & LMASK) | (c & TC_MASK);
1893             if (c == (c = compareAndExchangeCtl(c, nc | ac))) {


1894                 if (v == null)
1895                     createWorker();
1896                 else {
1897                     v.phase = sp;
1898                     if (v.parking != 0)
1899                         U.unpark(v.owner);
1900                 }
1901                 break;
1902             }
1903         }
1904     }
1905 
1906     /**
1907      * Releases all waiting workers. Called only during shutdown.
1908      */
1909     private void releaseWaiters() {
1910         for (long c = ctl;;) {
1911             WorkQueue[] qs; WorkQueue v; int sp, i;
1912             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1913                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1957                 else if ((e & SHUTDOWN) == 0)
1958                     return 0;
1959                 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1960                     return 0;
1961                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1962                     return 1;                             // enable termination
1963                 else
1964                     break;                                // restart
1965             }
1966         }
1967     }
1968 
1969     /**
1970      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1971      * See above for explanation.
1972      *
1973      * @param w caller's WorkQueue (may be null on failed initialization)
1974      */
1975     final void runWorker(WorkQueue w) {
1976         if (w != null) {
1977             int phase = w.phase, r = w.stackPred;     // seed from registerWorker
1978             int fifo = w.config & FIFO, nsteals = 0, src = -1;
1979             for (;;) {
1980                 WorkQueue[] qs;





1981                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1982                 if ((runState & STOP) != 0L || (qs = queues) == null)
1983                     break;
1984                 int n = qs.length, i = r, step = (r >>> 16) | 1;
1985                 boolean rescan = false;
1986                 scan: for (int l = n; l > 0; --l, i += step) {  // scan queues
1987                     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
1988                     if ((q = qs[j = i & (n - 1)]) != null &&
1989                         (a = q.array) != null && (cap = a.length) > 0) {
1990                         for (int m = cap - 1, pb = -1, b = q.base;;) {
1991                             ForkJoinTask<?> t; long k;
1992                             t = (ForkJoinTask<?>)U.getReferenceAcquire(
1993                                 a, k = slotOffset(m & b));
1994                             if (b != (b = q.base) || t == null ||
1995                                 !U.compareAndSetReference(a, k, t, null)) {
1996                                 if (a[b & m] == null) {
1997                                     if (rescan)           // end of run
1998                                         break scan;
1999                                     if (a[(b + 1) & m] == null &&
2000                                         a[(b + 2) & m] == null) {
2001                                         break;            // probably empty
2002                                     }
2003                                     if (pb == (pb = b)) { // track progress
2004                                         rescan = true;    // stalled; reorder scan
2005                                         break scan;
2006                                     }
2007                                 }

2008                             }
2009                             else {
2010                                 boolean propagate;
2011                                 int nb = q.base = b + 1, prevSrc = src;
2012                                 w.nsteals = ++nsteals;
2013                                 w.source = src = j;       // volatile
2014                                 rescan = true;
2015                                 int nh = t.noUserHelp();
2016                                 if (propagate =
2017                                     (prevSrc != src || nh != 0) && a[nb & m] != null)
2018                                     signalWork();
2019                                 w.topLevelExec(t, fifo);
2020                                 if ((b = q.base) != nb && !propagate)
2021                                     break scan;          // reduce interference
2022                             }
2023                         }
2024                     }
2025                 }
2026                 if (!rescan) {
2027                     if (((phase = deactivate(w, phase)) & IDLE) != 0)
2028                         break;
2029                     src = -1;                            // re-enable propagation
2030                 }
2031             }
2032         }
2033     }
2034 
2035     /**
2036      * Deactivates and if necessary awaits signal or termination.
2037      *
2038      * @param w the worker
2039      * @param phase current phase
2040      * @return current phase, with IDLE set if worker should exit
2041      */
2042     private int deactivate(WorkQueue w, int phase) {
2043         if (w == null)                        // currently impossible
2044             return IDLE;
2045         int p = phase | IDLE, activePhase = phase + (IDLE << 1);
2046         long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
2047         int sp = w.stackPred = (int)pc;       // set ctl stack link
2048         w.phase = p;
2049         if (!compareAndSetCtl(pc, qc))        // try to enqueue
2050             return w.phase = phase;           // back out on possible signal
2051         int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
2052         if (((e = runState) & STOP) != 0L ||
2053             ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
2054             (qs = queues) == null || (n = qs.length) <= 0)
2055             return IDLE;                      // terminating
2056 
2057         for (int prechecks = Math.min(ac, 2), // reactivation threshold
2058              k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
2059             WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
2060             if (w.phase == activePhase)
2061                 return activePhase;
2062             if (--k < 0)
2063                 return awaitWork(w, p);       // block, drop, or exit
2064             if ((q = qs[k & (n - 1)]) == null)
2065                 Thread.onSpinWait();
2066             else if ((a = q.array) != null && (cap = a.length) > 0 &&
2067                      a[q.base & (cap - 1)] != null && --prechecks < 0 &&
2068                      (int)(c = ctl) == activePhase &&
2069                      compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
2070                 return w.phase = activePhase; // reactivate



2071         }

2072     }
2073 
2074     /**
2075      * Awaits signal or termination.
2076      *
2077      * @param w the work queue
2078      * @param p current phase (known to be idle)
2079      * @return current phase, with IDLE set if worker should exit

2080      */
2081     private int awaitWork(WorkQueue w, int p) {
2082         if (w != null) {
2083             ForkJoinWorkerThread t; long deadline;
2084             if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
2085                 t.resetThreadLocals();          // clear before reactivate
2086             if ((ctl & RC_MASK) > 0L)







2087                 deadline = 0L;
2088             else if ((deadline =
2089                       (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
2090                       System.currentTimeMillis()) == 0L)
2091                 deadline = 1L;                 // avoid zero
2092             int activePhase = p + IDLE;
2093             if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
2094                 LockSupport.setCurrentBlocker(this);
2095                 w.parking = 1;                 // enable unpark
2096                 while ((p = w.phase) != activePhase) {
2097                     boolean trimmable = false; int trim;
2098                     Thread.interrupted();      // clear status
2099                     if ((runState & STOP) != 0L)
2100                         break;
2101                     if (deadline != 0L) {
2102                         if ((trim = tryTrim(w, p, deadline)) > 0)
2103                             break;
2104                         else if (trim < 0)
2105                             deadline = 0L;
2106                         else
2107                             trimmable = true;
2108                     }
2109                     U.park(trimmable, deadline);
2110                 }
2111                 w.parking = 0;
2112                 LockSupport.setCurrentBlocker(null);




2113             }

2114         }
2115         return p;
2116     }
2117 
2118     /**
2119      * Tries to remove and deregister worker after timeout, and release
2120      * another to do the same.
2121      * @return > 0: trimmed, < 0 : not trimmable, else 0
2122      */
2123     private int tryTrim(WorkQueue w, int phase, long deadline) {
2124         long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
2125         if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)

2126             stat = -1;                      // no longer ctl top
2127         else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
2128             stat = 0;                       // spurious wakeup
2129         else if (!compareAndSetCtl(
2130                      c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2131                                (TC_MASK & (c - TC_UNIT)))))
2132             stat = -1;                      // lost race to signaller
2133         else {
2134             stat = 1;
2135             w.source = DROPPED;
2136             w.phase = activePhase;
2137             if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2138                 vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2139                 compareAndSetCtl(           // try to wake up next waiter
2140                     nc, ((UMASK & (nc + RC_UNIT)) |
2141                          (nc & TC_MASK) | (v.stackPred & LMASK)))) {
2142                 v.source = INVALID_ID;      // enable cascaded timeouts
2143                 v.phase = vp;
2144                 U.unpark(v.owner);
2145             }
2146         }
2147         return stat;

2544         else
2545             return 0;
2546     }
2547 
2548     /**
2549      * Gets and removes a local or stolen task for the given worker.
2550      *
2551      * @return a task, if available
2552      */
2553     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2554         ForkJoinTask<?> t;
2555         if (w == null || (t = w.nextLocalTask()) == null)
2556             t = pollScan(false);
2557         return t;
2558     }
2559 
2560     // External operations
2561 
2562     /**
2563      * Finds and locks a WorkQueue for an external submitter, or
2564      * throws RejectedExecutionException if shutdown or terminating.
2565      * @param r current ThreadLocalRandom.getProbe() value
2566      * @param rejectOnShutdown true if RejectedExecutionException
2567      *        should be thrown when shutdown (else only if terminating)
2568      */
2569     private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
2570         int reuse;                                   // nonzero if prefer create
2571         if ((reuse = r) == 0) {
2572             ThreadLocalRandom.localInit();           // initialize caller's probe
2573             r = ThreadLocalRandom.getProbe();
2574         }
2575         for (int probes = 0; ; ++probes) {
2576             int n, i, id; WorkQueue[] qs; WorkQueue q;
2577             if ((qs = queues) == null)
2578                 break;
2579             if ((n = qs.length) <= 0)
2580                 break;
2581             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2582                 WorkQueue w = new WorkQueue(null, id, 0, false);
2583                 w.phase = id;
2584                 boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
2585                                   rejectOnShutdown);
2586                 if (!reject && queues == qs && qs[i] == null)
2587                     q = qs[i] = w;                   // else lost race to install
2588                 unlockRunState();
2589                 if (q != null)
2590                     return q;
2591                 if (reject)
2592                     break;
2593                 reuse = 0;
2594             }
2595             if (reuse == 0 || !q.tryLockPhase()) {   // move index
2596                 if (reuse == 0) {
2597                     if (probes >= n >> 1)
2598                         reuse = r;                   // stop prefering free slot
2599                 }
2600                 else if (q != null)
2601                     reuse = 0;                       // probe on collision
2602                 r = ThreadLocalRandom.advanceProbe(r);
2603             }
2604             else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2605                 q.unlockPhase();                     // check while q lock held
2606                 break;
2607             }
2608             else
2609                 return q;


2610         }
2611         throw new RejectedExecutionException();
2612     }
2613 
2614     private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2615         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2616         if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2617             (wt = (ForkJoinWorkerThread)t).pool == this) {
2618             internal = true;
2619             q = wt.workQueue;
2620         }
2621         else {                     // find and lock queue
2622             internal = false;
2623             q = submissionQueue(ThreadLocalRandom.getProbe(), true);
2624         }
2625         q.push(task, signalIfEmpty ? this : null, internal);
2626         return task;
2627     }
2628 
2629     /**
2630      * Returns queue for an external submission, bypassing call to
2631      * submissionQueue if already established and unlocked.
2632      */
2633     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2634         WorkQueue[] qs; WorkQueue q; int n;
2635         int r = ThreadLocalRandom.getProbe();
2636         return (((qs = queues) != null && (n = qs.length) > 0 &&
2637                  (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
2638                  q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
2639     }
2640 
2641     /**
2642      * Returns queue for an external thread, if one exists that has
2643      * possibly ever submitted to the given pool (nonzero probe), or
2644      * null if none.
2645      */
2646     static WorkQueue externalQueue(ForkJoinPool p) {
2647         WorkQueue[] qs; int n;
2648         int r = ThreadLocalRandom.getProbe();
2649         return (p != null && (qs = p.queues) != null &&
2650                 (n = qs.length) > 0 && r != 0) ?
2651             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2652     }
2653 
2654     /**
2655      * Returns external queue for common pool.
2656      */
2657     static WorkQueue commonQueue() {
2658         return externalQueue(common);
2659     }
2660 

 543      * WorkQueue field "phase" encodes the queue array id in lower
 544      * bits, and otherwise acts similarly to the pool runState field:
 545      * The "IDLE" bit is clear while active (either a released worker
 546      * or a locked external queue), with other bits serving as a
 547      * version counter to distinguish changes across multiple reads.
 548      * Note that phase field updates lag queue CAS releases; seeing a
 549      * non-idle phase does not guarantee that the worker is available
 550      * (and so is never checked in this way).
 551      *
 552      * The ctl field also serves as the basis for memory
 553      * synchronization surrounding activation. This uses a more
 554      * efficient version of a Dekker-like rule that task producers and
 555      * consumers sync with each other by both writing/CASing ctl (even
 556      * if to its current value).  However, rather than CASing ctl to
 557      * its current value in the common case where no action is
 558      * required, we reduce write contention by ensuring that
 559      * signalWork invocations are prefaced with a fully fenced memory
 560      * access (which is usually needed anyway).
 561      *
 562      * Signalling. Signals (in signalWork) cause new or reactivated
 563      * workers to scan for tasks.  SignalWork is invoked in two cases:
 564      * (1) When a task is pushed onto an empty queue, and (2) When a
 565      * worker takes a top-level task from a queue that has additional
 566      * tasks. Together, these suffice in O(log(#threads)) steps to
 567      * fully activate with at least enough workers, and ideally no
 568      * more than required.  This ideal is unobtainable: Callers do not
 569      * know whether another worker will finish its current task and
 570      * poll for others without need of a signal (which is otherwise an
 571      * advantage of work-stealing vs other schemes), and also must
 572      * conservatively estimate the triggering conditions of emptiness
 573      * or non-emptiness; all of which usually cause more activations
 574      * than necessary (see below). (Method signalWork is also used as
 575      * failsafe in case of Thread failures in deregisterWorker, to
 576      * activate or create a new worker to replace them).
 577      *
 578      * Top-Level scheduling
 579      * ====================



































 580      *
 581      * Scanning. Method runWorker performs top-level scanning for (and
 582      * execution of) tasks by polling a pseudo-random permutation of
 583      * the array (by starting at a given index, and using a constant
 584      * cyclically exhaustive stride.)  It uses the same basic polling
 585      * method as WorkQueue.poll(), but restarts with a different
 586      * permutation on each rescan.  The pseudorandom generator need
 587      * not have high-quality statistical properties in the long
 588      * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
 589      * from ThreadLocalRandom probes, which are cheap and suffice.











 590      *
 591      * Deactivation. When no tasks are found by a worker in runWorker,
 592      * it invokes awaitWork, that first deactivates (to an IDLE
 593      * phase).  Avoiding missed signals during deactivation requires a
 594      * (conservative) rescan, reactivating if there may be tasks to
 595      * poll. Because idle workers are often not yet blocked (parked),
 596      * we use a WorkQueue field to advertise that a waiter actually
 597      * needs unparking upon signal.
 598      *
 599      * When tasks are constructed as (recursive) DAGs, top-level
 600      * scanning is usually infrequent, and doesn't encounter most
 601      * of the following problems addressed by runWorker and awaitWork:
 602      *
 603      * Locality. Polls are organized into "runs", continuing until
 604      * empty or contended, while also minimizing interference by
 605      * postponing bookeeping to ends of runs. This may reduce
 606      * fairness.
 607      *
 608      * Contention. When many workers try to poll few queues, they
 609      * often collide, generating CAS failures and disrupting locality
 610      * of workers already running their tasks. This also leads to
 611      * stalls when tasks cannot be taken because other workers have
 612      * not finished poll operations, which is detected by reading
 613      * ahead in queue arrays. In both cases, workers restart scans in a
 614      * way that approximates randomized backoff.
 615      *
 616      * Oversignalling. When many short top-level tasks are present in
 617      * a small number of queues, the above signalling strategy may
 618      * activate many more workers than needed, worsening locality and
 619      * contention problems, while also generating more global
 620      * contention (field ctl is CASed on every activation and
 621      * deactivation). We filter out (both in runWorker and
 622      * signalWork) attempted signals that are surely not needed
 623      * because the signalled tasks are already taken.
 624      *
 625      * Shutdown and Quiescence
 626      * =======================
 627      *
 628      * Quiescence. Workers scan looking for work, giving up when they
 629      * don't find any, without being sure that none are available.
 630      * However, some required functionality relies on consensus about
 631      * quiescence (also termination, discussed below). The count
 632      * fields in ctl allow accurate discovery of states in which all
 633      * workers are idle.  However, because external (asynchronous)
 634      * submitters are not part of this vote, these mechanisms
 635      * themselves do not guarantee that the pool is in a quiescent
 636      * state with respect to methods isQuiescent, shutdown (which
 637      * begins termination when quiescent), helpQuiesce, and indirectly
 638      * others including tryCompensate. Method quiescent() is used in
 639      * all of these contexts. It provides checks that all workers are
 640      * idle and there are no submissions that they could poll if they
 641      * were not idle, retrying on inconsistent reads of queues and
 642      * using the runState seqLock to retry on queue array updates.
 643      * (It also reports quiescence if the pool is terminating.) A true
 644      * report means only that there was a moment at which quiescence
 645      * held.  False negatives are inevitable (for example when queues
 646      * indices lag updates, as described above), which is accommodated

 856      * ====================
 857      *
 858      * Regular ForkJoinTasks manage task cancellation (method cancel)
 859      * independently from the interrupt status of threads running
 860      * tasks.  Interrupts are issued internally only while
 861      * terminating, to wake up workers and cancel queued tasks.  By
 862      * default, interrupts are cleared only when necessary to ensure
 863      * that calls to LockSupport.park do not loop indefinitely (park
 864      * returns immediately if the current thread is interrupted).
 865      *
 866      * To comply with ExecutorService specs, we use subclasses of
 867      * abstract class InterruptibleTask for tasks that require
 868      * stronger interruption and cancellation guarantees.  External
 869      * submitters never run these tasks, even if in the common pool
 870      * (as indicated by ForkJoinTask.noUserHelp status bit).
 871      * InterruptibleTasks include a "runner" field (implemented
 872      * similarly to FutureTask) to support cancel(true).  Upon pool
 873      * shutdown, runners are interrupted so they can cancel. Since
 874      * external joining callers never run these tasks, they must await
 875      * cancellation by others, which can occur along several different
 876      * paths.


 877      *
 878      * Across these APIs, rules for reporting exceptions for tasks
 879      * with results accessed via join() differ from those via get(),
 880      * which differ from those invoked using pool submit methods by
 881      * non-workers (which comply with Future.get() specs). Internal
 882      * usages of ForkJoinTasks ignore interrupt status when executing
 883      * or awaiting completion.  Otherwise, reporting task results or
 884      * exceptions is preferred to throwing InterruptedExceptions,
 885      * which are in turn preferred to timeouts. Similarly, completion
 886      * status is preferred to reporting cancellation.  Cancellation is
 887      * reported as an unchecked exception by join(), and by worker
 888      * calls to get(), but is otherwise wrapped in a (checked)
 889      * ExecutionException.
 890      *
 891      * Worker Threads cannot be VirtualThreads, as enforced by
 892      * requiring ForkJoinWorkerThreads in factories.  There are
 893      * several constructions relying on this.  However as of this
 894      * writing, virtual thread bodies are by default run as some form
 895      * of InterruptibleTask.
 896      *

 923      * the placement of their fields. Caches misses and contention due
 924      * to false-sharing have been observed to slow down some programs
 925      * by more than a factor of four. Effects may vary across initial
 926      * memory configuarations, applications, and different garbage
 927      * collectors and GC settings, so there is no perfect solution.
 928      * Too much isolation may generate more cache misses in common
 929      * cases (because some fields snd slots are usually read at the
 930      * same time). The @Contended annotation provides only rough
 931      * control (for good reason). Similarly for relying on fields
 932      * being placed in size-sorted declaration order.
 933      *
 934      * We isolate the ForkJoinPool.ctl field that otherwise causes the
 935      * most false-sharing misses with respect to other fields. Also,
 936      * ForkJoinPool fields are ordered such that fields less prone to
 937      * contention effects are first, offsetting those that otherwise
 938      * would be, while also reducing total footprint vs using
 939      * multiple @Contended regions, which tends to slow down
 940      * less-contended applications. To help arrange this, some
 941      * non-reference fields are declared as "long" even when ints or
 942      * shorts would suffice.  For class WorkQueue, an
 943      * embedded @Contended isolates the very busy top index, and
 944      * another segregates status and bookkeeping fields written
 945      * (mostly) by owners, that otherwise interfere with reading
 946      * array, top, and base fields. There are other variables commonly
 947      * contributing to false-sharing-related performance issues
 948      * (including fields of class Thread), but we can't do much about
 949      * this except try to minimize access.
 950      *
 951      * Initial sizing and resizing of WorkQueue arrays is an even more
 952      * delicate tradeoff because the best strategy systematically
 953      * varies across garbage collectors. Small arrays are better for
 954      * locality and reduce GC scan time, but large arrays reduce both
 955      * direct false-sharing and indirect cases due to GC bookkeeping
 956      * (cardmarks etc), and reduce the number of resizes, which are
 957      * not especially fast because they require atomic transfers.
 958      * Currently, arrays are initialized to be just large enough to
 959      * avoid resizing in most tree-structured tasks, but grow rapidly
 960      * until large.  (Maintenance note: any changes in fields, queues,
 961      * or their uses, or JVM layout policies, must be accompanied by
 962      * re-evaluation of these placement and sizing decisions.)


 963      *
 964      * Style notes
 965      * ===========
 966      *
 967      * Memory ordering relies mainly on atomic operations (CAS,
 968      * getAndSet, getAndAdd) along with moded accesses. These use
 969      * jdk-internal Unsafe for atomics and special memory modes,
 970      * rather than VarHandles, to avoid initialization dependencies in
 971      * other jdk components that require early parallelism.  This can
 972      * be awkward and ugly, but also reflects the need to control
 973      * outcomes across the unusual cases that arise in very racy code
 974      * with very few invariants. All atomic task slot updates use
 975      * Unsafe operations requiring offset positions, not indices, as
 976      * computed by method slotOffset. All fields are read into locals
 977      * before use, and null-checked if they are references, even if
 978      * they can never be null under current usages. Usually,
 979      * computations (held in local variables) are defined as soon as
 980      * logically enabled, sometimes to convince compilers that they
 981      * may be performed despite memory ordering constraints.  Array
 982      * accesses using masked indices include checks (that are always

1025      */
1026     static final long DEFAULT_KEEPALIVE = 60_000L;
1027 
1028     /**
1029      * Undershoot tolerance for idle timeouts, also serving as the
1030      * minimum allowed timeout value.
1031      */
1032     static final long TIMEOUT_SLOP = 20L;
1033 
1034     /**
1035      * The default value for common pool maxSpares.  Overridable using
1036      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
1037      * system property.  The default value is far in excess of normal
1038      * requirements, but also far short of maximum capacity and typical OS
1039      * thread limits, so allows JVMs to catch misuse/abuse before
1040      * running out of resources needed to do so.
1041      */
1042     static final int DEFAULT_COMMON_MAX_SPARES = 256;
1043 
1044     /**
1045      * Initial capacity of work-stealing queue array.
1046      * Must be a power of two, at least 2. See above.
1047      */
1048     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
1049 






1050     // conversions among short, int, long
1051     static final int  SMASK           = 0xffff;      // (unsigned) short bits
1052     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
1053     static final long UMASK           = ~LMASK;      // upper 32 bits
1054 
1055     // masks and sentinels for queue indices
1056     static final int MAX_CAP          = 0x7fff;   // max # workers
1057     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
1058     static final int INVALID_ID       = 0x4000;   // unused external queue id
1059 
1060     // pool.runState bits
1061     static final long STOP            = 1L <<  0;   // terminating
1062     static final long SHUTDOWN        = 1L <<  1;   // terminate when quiescent
1063     static final long CLEANED         = 1L <<  2;   // stopped and queues cleared
1064     static final long TERMINATED      = 1L <<  3;   // only set if STOP also set
1065     static final long RS_LOCK         = 1L <<  4;   // lowest seqlock bit
1066 
1067     // spin/sleep limits for runState locking and elsewhere
1068     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
1069     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos

1161     static final class DefaultForkJoinWorkerThreadFactory
1162         implements ForkJoinWorkerThreadFactory {
1163         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
1164             return ((pool.workerNamePrefix == null) ? // is commonPool
1165                     new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool) :
1166                     new ForkJoinWorkerThread(null, pool, true, false));
1167         }
1168     }
1169 
1170     /**
1171      * Queues supporting work-stealing as well as external task
1172      * submission. See above for descriptions and algorithms.
1173      */
1174     static final class WorkQueue {
1175         // fields declared in order of their likely layout on most VMs
1176         final ForkJoinWorkerThread owner; // null if shared
1177         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
1178         int base;                  // index of next slot for poll
1179         final int config;          // mode bits
1180 
1181         @jdk.internal.vm.annotation.Contended("t") // segregate

1182         int top;                   // index of next slot for push
1183 
1184         // fields otherwise causing more unnecessary false-sharing cache misses
1185         @jdk.internal.vm.annotation.Contended("w")
1186         volatile int phase;        // versioned active status
1187         @jdk.internal.vm.annotation.Contended("w")
1188         int stackPred;             // pool stack (ctl) predecessor link
1189         @jdk.internal.vm.annotation.Contended("w")
1190         volatile int parking;      // nonzero if parked in awaitWork
1191         @jdk.internal.vm.annotation.Contended("w")
1192         volatile int source;       // source queue id (or DROPPED)
1193         @jdk.internal.vm.annotation.Contended("w")
1194         int nsteals;               // number of steals from other queues


1195 
1196         // Support for atomic operations
1197         private static final Unsafe U;
1198         private static final long PHASE;
1199         private static final long BASE;
1200         private static final long TOP;
1201         private static final long ARRAY;
1202 
1203         final void updateBase(int v) {
1204             U.putIntVolatile(this, BASE, v);
1205         }
1206         final void updateTop(int v) {
1207             U.putIntOpaque(this, TOP, v);
1208         }
1209         final void updateArray(ForkJoinTask<?>[] a) {
1210             U.getAndSetReference(this, ARRAY, a);
1211         }
1212         final void unlockPhase() {
1213             U.getAndAddInt(this, PHASE, IDLE);
1214         }
1215         final boolean tryLockPhase() {    // seqlock acquire
1216             int p;
1217             return (((p = phase) & IDLE) != 0 &&
1218                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
1219         }
1220 
1221         /**
1222          * Constructor. For internal queues, most fields are initialized
1223          * upon thread start in pool.registerWorker.
1224          */
1225         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
1226                   boolean clearThreadLocals) {




1227             this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
1228             if ((this.owner = owner) == null) {
1229                 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1230                 phase = id | IDLE;
1231             }
1232         }
1233 
1234         /**
1235          * Returns an exportable index (used by ForkJoinWorkerThread).
1236          */
1237         final int getPoolIndex() {
1238             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
1239         }
1240 
1241         /**
1242          * Returns the approximate number of tasks in the queue.
1243          */
1244         final int queueSize() {
1245             int unused = phase;             // for ordering effect
1246             return Math.max(top - base, 0); // ignore transient negative
1247         }
1248 
1249         /**
1250          * Pushes a task. Called only by owner or if already locked
1251          *
1252          * @param task the task; no-op if null
1253          * @param pool the pool to signal if was previously empty, else null
1254          * @param internal if caller owns this queue
1255          * @throws RejectedExecutionException if array could not be resized
1256          */
1257         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
1258             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
1259             if ((a = array) != null && (cap = a.length) > 0) { // else disabled


1260                 if ((room = (m = cap - 1) - (s - b)) >= 0) {
1261                     top = s + 1;
1262                     long pos = slotOffset(m & s);
1263                     if (!internal)
1264                         U.putReference(a, pos, task);       // inside lock
1265                     else
1266                         U.getAndSetReference(a, pos, task); // fully fenced
1267                     if (room == 0 && (a = growArray(a, cap, s)) != null)
1268                         m = a.length - 1;                   // resize
1269                 }
1270                 if (!internal)
1271                     unlockPhase();
1272                 if (room < 0)
1273                     throw new RejectedExecutionException("Queue capacity exceeded");
1274                 if (pool != null && a != null &&
1275                     U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null)
1276                     pool.signalWork(a, m & s);   // may have appeared empty
1277             }
1278         }
1279 
1280         /**
1281          * Resizes the queue array unless out of memory.
1282          * @param a old array
1283          * @param cap old array capacity
1284          * @param s current top
1285          * @return new array, or null on failure
1286          */
1287         private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
1288             int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
1289             ForkJoinTask<?>[] newArray = null;
1290             if (a != null && a.length == cap && cap > 0 && newCap > 0) {

1291                 try {
1292                     newArray = new ForkJoinTask<?>[newCap];
1293                 } catch (OutOfMemoryError ex) {
1294                 }
1295                 if (newArray != null) {               // else throw on next push
1296                     int mask = cap - 1, newMask = newCap - 1;
1297                     for (int k = s, j = cap; j > 0; --j, --k) {
1298                         ForkJoinTask<?> u;            // poll old, push to new
1299                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
1300                                  a, slotOffset(k & mask), null)) == null)
1301                             break;                    // lost to pollers
1302                         newArray[k & newMask] = u;
1303                     }
1304                     updateArray(newArray);           // fully fenced
1305                 }
1306             }
1307             return newArray;
1308         }
1309 
1310         /**
1311          * Takes next task, if one exists, in lifo order.


1312          */
1313         private ForkJoinTask<?> localPop() {
1314             ForkJoinTask<?> t = null;
1315             int s = top - 1, cap; long k; ForkJoinTask<?>[] a;
1316             if ((a = array) != null && (cap = a.length) > 0 &&
1317                 U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
1318                 (t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
1319                 updateTop(s);
1320             return t;
1321         }
1322 
1323         /**
1324          * Takes next task, if one exists, in fifo order.
1325          */
1326         private ForkJoinTask<?> localPoll() {
1327             ForkJoinTask<?> t = null;
1328             int p = top, cap; ForkJoinTask<?>[] a;
1329             if ((a = array) != null && (cap = a.length) > 0) {
1330                 for (int b = base; p - b > 0; ) {
1331                     int nb = b + 1;
1332                     long k = slotOffset((cap - 1) & b);
1333                     if (U.getReference(a, k) == null) {
1334                         if (nb == p)
1335                             break;          // else base is lagging
1336                         while (b == (b = U.getIntAcquire(this, BASE)))
1337                             Thread.onSpinWait(); // spin to reduce memory traffic
1338                     }
1339                     else if ((t = (ForkJoinTask<?>)
1340                               U.getAndSetReference(a, k, null)) != null) {
1341                         updateBase(nb);
1342                         break;
1343                     }
1344                     else
1345                         b = base;


1346                 }
1347             }
1348             return t;
1349         }
1350 
1351         /**
1352          * Takes next task, if one exists, using configured mode.

1353          */
1354         final ForkJoinTask<?> nextLocalTask() {
1355             return (config & FIFO) == 0 ? localPop() : localPoll();
1356         }
1357 
1358         /**
1359          * Pops the given task only if it is at the current top.
1360          * @param task the task. Caller must ensure non-null.
1361          * @param internal if caller owns this queue
1362          */
1363         final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
1364             boolean taken = false;
1365             ForkJoinTask<?>[] a = array;
1366             int p = top, s = p - 1, cap; long k;
1367             if (a != null && (cap = a.length) > 0 &&
1368                 U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
1369                 (internal || tryLockPhase())) {
1370                 if (top == p && U.compareAndSetReference(a, k, task, null)) {
1371                     taken = true;
1372                     updateTop(s);
1373                 }
1374                 if (!internal)
1375                     unlockPhase();

1416                         break;                     // empty
1417                     if (pb == b)
1418                         Thread.onSpinWait();       // stalled
1419                 }
1420                 else if (U.compareAndSetReference(a, k, t, null)) {
1421                     updateBase(nb);
1422                     return t;
1423                 }
1424             }
1425             return null;
1426         }
1427 
1428         // specialized execution methods
1429 
1430         /**
1431          * Runs the given task, as well as remaining local tasks.
1432          */
1433         final void topLevelExec(ForkJoinTask<?> task, int fifo) {
1434             while (task != null) {
1435                 task.doExec();
1436                 task = (fifo == 0) ? localPop() : localPoll();
1437             }
1438         }
1439 
1440         /**
1441          * Deep form of tryUnpush: Traverses from top and removes and
1442          * runs task if present.
1443          */
1444         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
1445             ForkJoinTask<?>[] a = array;
1446             int b = base, p = top, s = p - 1, d = p - b, cap;
1447             if (a != null && (cap = a.length) > 0) {
1448                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
1449                     long k; boolean taken;
1450                     ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
1451                         a, k = slotOffset(i & m));
1452                     if (t == null)
1453                         break;
1454                     if (t == task) {
1455                         if (!internal && !tryLockPhase())
1456                             break;                  // fail if locked

1546                 }
1547                 else if (!(t instanceof CompletableFuture
1548                            .AsynchronousCompletionTask))
1549                     break;
1550                 if (blocker != null && blocker.isReleasable())
1551                     break;
1552                 if (base == b && t != null &&
1553                     U.compareAndSetReference(a, k, t, null)) {
1554                     updateBase(b + 1);
1555                     t.doExec();
1556                 }
1557             }
1558         }
1559 
1560         // misc
1561 
1562         /**
1563          * Cancels all local tasks. Called only by owner.
1564          */
1565         final void cancelTasks() {
1566             for (ForkJoinTask<?> t; (t = localPop()) != null; ) {
1567                 try {
1568                     t.cancel(false);
1569                 } catch (Throwable ignore) {
1570                 }
1571             }
1572         }
1573 
1574         /**
1575          * Returns true if internal and not known to be blocked.
1576          */
1577         final boolean isApparentlyUnblocked() {
1578             Thread wt; Thread.State s;
1579             return ((wt = owner) != null && (phase & IDLE) != 0 &&
1580                     (s = wt.getState()) != Thread.State.BLOCKED &&
1581                     s != Thread.State.WAITING &&
1582                     s != Thread.State.TIMED_WAITING);
1583         }
1584 
1585         static {
1586             U = Unsafe.getUnsafe();

1748         return false;
1749     }
1750 
1751     /**
1752      * Provides a name for ForkJoinWorkerThread constructor.
1753      */
1754     final String nextWorkerThreadName() {
1755         String prefix = workerNamePrefix;
1756         long tid = incrementThreadIds() + 1L;
1757         if (prefix == null) // commonPool has no prefix
1758             prefix = "ForkJoinPool.commonPool-worker-";
1759         return prefix.concat(Long.toString(tid));
1760     }
1761 
1762     /**
1763      * Finishes initializing and records internal queue.
1764      *
1765      * @param w caller's WorkQueue
1766      */
1767     final void registerWorker(WorkQueue w) {
1768         if (w != null) {
1769             w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1770             ThreadLocalRandom.localInit();
1771             int seed = w.stackPred = ThreadLocalRandom.getProbe();
1772             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
1773             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
1774             long stop = lockRunState() & STOP;
1775             try {
1776                 WorkQueue[] qs; int n;
1777                 if (stop == 0L && (qs = queues) != null && (n = qs.length) > 0) {
1778                     for (int k = n, m = n - 1;  ; id += 2) {
1779                         if (qs[id &= m] == null)
1780                             break;
1781                         if ((k -= 2) <= 0) {
1782                             id |= n;
1783                             break;
1784                         }
1785                     }
1786                     w.phase = id | phaseSeq;    // now publishable
1787                     if (id < n)
1788                         qs[id] = w;
1789                     else {                      // expand

1827             do {} while (c != (c = compareAndExchangeCtl(
1828                                    c, ((RC_MASK & (c - RC_UNIT)) |
1829                                        (TC_MASK & (c - TC_UNIT)) |
1830                                        (LMASK & c)))));
1831         }
1832         if (phase != 0 && w != null) {     // remove index unless terminating
1833             long ns = w.nsteals & 0xffffffffL;
1834             if ((runState & STOP) == 0L) {
1835                 WorkQueue[] qs; int n, i;
1836                 if ((lockRunState() & STOP) == 0L &&
1837                     (qs = queues) != null && (n = qs.length) > 0 &&
1838                     qs[i = phase & SMASK & (n - 1)] == w) {
1839                     qs[i] = null;
1840                     stealCount += ns;      // accumulate steals
1841                 }
1842                 unlockRunState();
1843             }
1844         }
1845         if ((tryTerminate(false, false) & STOP) == 0L &&
1846             phase != 0 && w != null && w.source != DROPPED) {

1847             w.cancelTasks();               // clean queue
1848             signalWork(null, 0);           // possibly replace
1849         }
1850         if (ex != null)
1851             ForkJoinTask.rethrow(ex);
1852     }
1853 
1854     /**
1855      * Releases an idle worker, or creates one if not enough exist,
1856      * giving up if array a is nonnull and task at a[k] already taken.
1857      */
1858     final void signalWork(ForkJoinTask<?>[] a, int k) {
1859         int pc = parallelism;
1860         for (long c = ctl;;) {
1861             WorkQueue[] qs = queues;
1862             long ac = (c + RC_UNIT) & RC_MASK, nc;
1863             int sp = (int)c, i = sp & SMASK;
1864             if ((short)(c >>> RC_SHIFT) >= pc)
1865                 break;
1866             if (qs == null)
1867                 break;
1868             if (qs.length <= i)
1869                 break;
1870             WorkQueue w = qs[i], v = null;
1871             if (sp == 0) {
1872                 if ((short)(c >>> TC_SHIFT) >= pc)
1873                     break;
1874                 nc = ((c + TC_UNIT) & TC_MASK) | ac;
1875             }
1876             else if ((v = w) == null)
1877                 break;
1878             else
1879                 nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
1880             if (a != null && k < a.length && k >= 0 && a[k] == null)
1881                 break;
1882             if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
1883                 if (v == null)
1884                     createWorker();
1885                 else {
1886                     v.phase = sp;
1887                     if (v.parking != 0)
1888                         U.unpark(v.owner);
1889                 }
1890                 break;
1891             }
1892         }
1893     }
1894 
1895     /**
1896      * Releases all waiting workers. Called only during shutdown.
1897      */
1898     private void releaseWaiters() {
1899         for (long c = ctl;;) {
1900             WorkQueue[] qs; WorkQueue v; int sp, i;
1901             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1902                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)

1946                 else if ((e & SHUTDOWN) == 0)
1947                     return 0;
1948                 else if ((ds = delayScheduler) != null && !ds.canShutDown())
1949                     return 0;
1950                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
1951                     return 1;                             // enable termination
1952                 else
1953                     break;                                // restart
1954             }
1955         }
1956     }
1957 
1958     /**
1959      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1960      * See above for explanation.
1961      *
1962      * @param w caller's WorkQueue (may be null on failed initialization)
1963      */
1964     final void runWorker(WorkQueue w) {
1965         if (w != null) {
1966             int r = w.stackPred;                          // seed from registerWorker
1967             int fifo = (int)config & FIFO;
1968             int nsteals = 0;                              // shadow w.nsteals
1969             boolean rescan = true;
1970             WorkQueue[] qs; int n;
1971             while ((rescan || deactivate(w) == 0) && (runState & STOP) == 0L &&
1972                    (qs = queues) != null && (n = qs.length) > 0) {
1973                 rescan = false;
1974                 int i = r, step = (r >>> 16) | 1;
1975                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1976                 scan: for (int j = n << 1; j != 0; --j, i += step) { // 2 sweeps
1977                     WorkQueue q; int qid;
1978                     if ((q = qs[qid = i & (n - 1)]) != null) {
1979                         for (;;) {                        // poll queue q
1980                             ForkJoinTask<?>[] a; int cap, b, m, nb, nk;
1981                             if ((a = q.array) == null || (cap = a.length) <= 0)
1982                                 break;
1983                             long bp = slotOffset((m = cap - 1) & (b = q.base));
1984                             long np = slotOffset(nk = m & (nb = b + 1));
1985                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1986                                 U.getReferenceAcquire(a, bp);
1987                             if (q.array != a || q.base != b ||
1988                                 U.getReference(a, bp) != t)
1989                                 continue;                 // inconsistent
1990                             if (t == null) {
1991                                 if (rescan) {             // end of run
1992                                     w.nsteals = nsteals;
1993                                     break scan;
1994                                 }
1995                                 if (U.getReference(a, np) != null) {
1996                                     rescan = true;        // stalled; reorder scan
1997                                     break scan;



1998                                 }
1999                                 break;                    // probably empty
2000                             }
2001                             if (U.compareAndSetReference(a, bp, t, null)) {
2002                                 q.base = nb;
2003                                 Object nt = U.getReferenceAcquire(a, np);
2004                                 if (!rescan) {            // begin run
2005                                     rescan = true;
2006                                     w.source = qid;
2007                                 }
2008                                 ++nsteals;
2009                                 if (nt != null &&         // confirm a[nk]
2010                                     U.getReference(a, np) == nt)
2011                                     signalWork(a, nk);    // propagate
2012                                 w.topLevelExec(t, fifo);  // run t & its subtasks

2013                             }
2014                         }
2015                     }
2016                 }





2017             }
2018         }
2019     }
2020 
2021     /**
2022      * Deactivates and awaits signal or termination.
2023      *
2024      * @param w the work queue
2025      * @return zero if now active

2026      */
2027     private int deactivate(WorkQueue w) {
2028         int idle = 1;
2029         if (w != null) {                        // always true; hoist checks
2030             int inactive = w.phase |= IDLE;     // set status
2031             int activePhase = inactive + IDLE;  // phase value when reactivated
2032             long ap = activePhase & LMASK, pc = ctl, qc;
2033             do {                                // enqueue
2034                 qc = ap | ((pc - RC_UNIT) & UMASK);
2035                 w.stackPred = (int)pc;          // set ctl stack link
2036             } while (pc != (pc = compareAndExchangeCtl(pc, qc)));
2037 
2038             WorkQueue[] qs; int n; long e;
2039             if (((e = runState) & STOP) == 0 && // quiescence checks
2040                 ((e & SHUTDOWN) == 0L || (qc & RC_MASK) > 0L || quiescent() <= 0) &&
2041                 (qs = queues) != null && (n = qs.length) > 1) {
2042                 long psp = pc & LMASK;          // ctl predecessor prefix
2043                 for (int i = 1; i < n; ++i) {   // scan; stagger origins
2044                     WorkQueue q; long c;        // missed signal check
2045                     if ((q = qs[(activePhase + i) & (n - 1)]) != null &&
2046                         q.top - q.base > 0) {
2047                         if ((idle = w.phase - activePhase) != 0 &&
2048                             (int)(c = ctl) == activePhase &&
2049                             compareAndSetCtl(c, psp | ((c + RC_UNIT) & UMASK))) {
2050                             w.phase = activePhase;
2051                             idle = 0;           // reactivated
2052                         }                       // else ineligible or lost race
2053                         break;
2054                     }
2055                 }
2056                 if (idle != 0 && (idle = w.phase - activePhase) != 0)
2057                     idle = awaitWork(w, activePhase, n);
2058             }
2059         }
2060         return idle;
2061     }
2062 
2063     /**
2064      * Awaits signal or termination.
2065      *
2066      * @param w the work queue
2067      * @param activePhase w's next active phase
2068      * @param qsize current size of queues array
2069      * @return zero if now active
2070      */
2071     private int awaitWork(WorkQueue w, int activePhase, int qsize) {
2072         int idle = 1;
2073         int spins = qsize | (qsize - 1);      // approx traversal cost
2074         if (w != null) {                      // always true; hoist checks
2075             boolean trimmable; long deadline, c;
2076             long trimTime = (w.source == INVALID_ID) ? TIMEOUT_SLOP : keepAlive;
2077             if ((w.config & CLEAR_TLS) != 0 && // instanceof check always true
2078                 Thread.currentThread() instanceof ForkJoinWorkerThread f)
2079                 f.resetThreadLocals();        // clear while accessing thread state
2080             LockSupport.setCurrentBlocker(this);
2081             if (trimmable = (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase))
2082                 deadline = trimTime + System.currentTimeMillis();
2083             else
2084                 deadline = 0L;
2085             for (;;) {
2086                 int s = spins, trim;
2087                 Thread.interrupted();         // clear status
2088                 if ((runState & STOP) != 0L)
2089                     break;
2090                 while ((idle = w.phase - activePhase) != 0 && --s != 0)
2091                     Thread.onSpinWait();      // spin before blocking
2092                 if (idle == 0)
2093                     break;
2094                 if (trimmable &&
2095                     (trim = tryTrim(w, activePhase, deadline)) != 0) {
2096                     if (trim > 0)
2097                         break;
2098                     trimmable = false;
2099                     deadline = 0L;







2100                 }
2101                 w.parking = 1;                // enable unpark and recheck
2102                 if ((idle = w.phase - activePhase) != 0)
2103                     U.park(trimmable, deadline);
2104                 w.parking = 0;                // close unpark window
2105                 if (idle == 0 || (idle = w.phase - activePhase) == 0)
2106                     break;
2107             }
2108             LockSupport.setCurrentBlocker(null);
2109         }
2110         return idle;
2111     }
2112 
2113     /**
2114      * Tries to remove and deregister worker after timeout, and release
2115      * another to do the same.
2116      * @return > 0: trimmed, < 0 : not trimmable, else 0
2117      */
2118     private int tryTrim(WorkQueue w, int activePhase, long deadline) {
2119         long c, nc; int stat, vp, i; WorkQueue[] vs; WorkQueue v;
2120         long waitTime = deadline - System.currentTimeMillis();
2121         if ((int)(c = ctl) != activePhase || w == null)
2122             stat = -1;                      // no longer ctl top
2123         else if (waitTime > TIMEOUT_SLOP)
2124             stat = 0;                       // spurious wakeup
2125         else if (!compareAndSetCtl(
2126                      c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
2127                                (TC_MASK & (c - TC_UNIT)))))
2128             stat = -1;                      // lost race to signaller
2129         else {
2130             stat = 1;
2131             w.source = DROPPED;
2132             w.phase = activePhase;
2133             if ((vp = (int)nc) != 0 && (vs = queues) != null &&
2134                 vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
2135                 compareAndSetCtl(           // try to wake up next waiter
2136                     nc, ((UMASK & (nc + RC_UNIT)) |
2137                          (nc & TC_MASK) | (v.stackPred & LMASK)))) {
2138                 v.source = INVALID_ID;      // enable cascaded timeouts
2139                 v.phase = vp;
2140                 U.unpark(v.owner);
2141             }
2142         }
2143         return stat;

2540         else
2541             return 0;
2542     }
2543 
2544     /**
2545      * Gets and removes a local or stolen task for the given worker.
2546      *
2547      * @return a task, if available
2548      */
2549     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
2550         ForkJoinTask<?> t;
2551         if (w == null || (t = w.nextLocalTask()) == null)
2552             t = pollScan(false);
2553         return t;
2554     }
2555 
2556     // External operations
2557 
2558     /**
2559      * Finds and locks a WorkQueue for an external submitter, or
2560      * throws RejectedExecutionException if shutdown

2561      * @param rejectOnShutdown true if RejectedExecutionException
2562      *        should be thrown when shutdown
2563      */
2564     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
2565         int r;
2566         if ((r = ThreadLocalRandom.getProbe()) == 0) {
2567             ThreadLocalRandom.localInit();   // initialize caller's probe
2568             r = ThreadLocalRandom.getProbe();
2569         }
2570         for (;;) {
2571             WorkQueue q; WorkQueue[] qs; int n, id, i;
2572             if ((qs = queues) == null || (n = qs.length) <= 0)


2573                 break;
2574             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
2575                 WorkQueue newq = new WorkQueue(null, id, 0, false);
2576                 lockRunState();
2577                 if (qs[i] == null && queues == qs)
2578                     q = qs[i] = newq;         // else lost race to install


2579                 unlockRunState();





2580             }
2581             if (q != null && q.tryLockPhase()) {
2582                 if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
2583                     q.unlockPhase();          // check while q lock held
2584                     break;
2585                 }









2586                 return q;
2587             }
2588             r = ThreadLocalRandom.advanceProbe(r); // move
2589         }
2590         throw new RejectedExecutionException();
2591     }
2592 
2593     private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
2594         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
2595         if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
2596             (wt = (ForkJoinWorkerThread)t).pool == this) {
2597             internal = true;
2598             q = wt.workQueue;
2599         }
2600         else {                     // find and lock queue
2601             internal = false;
2602             q = externalSubmissionQueue(true);
2603         }
2604         q.push(task, signalIfEmpty ? this : null, internal);
2605         return task;
2606     }
2607 












2608     /**
2609      * Returns queue for an external thread, if one exists that has
2610      * possibly ever submitted to the given pool (nonzero probe), or
2611      * null if none.
2612      */
2613     static WorkQueue externalQueue(ForkJoinPool p) {
2614         WorkQueue[] qs; int n;
2615         int r = ThreadLocalRandom.getProbe();
2616         return (p != null && (qs = p.queues) != null &&
2617                 (n = qs.length) > 0 && r != 0) ?
2618             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
2619     }
2620 
2621     /**
2622      * Returns external queue for common pool.
2623      */
2624     static WorkQueue commonQueue() {
2625         return externalQueue(common);
2626     }
2627 
< prev index next >