< prev index next > src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Print this page
* required, we reduce write contention by ensuring that
* signalWork invocations are prefaced with a fully fenced memory
* access (which is usually needed anyway).
*
* Signalling. Signals (in signalWork) cause new or reactivated
! * workers to scan for tasks. Method signalWork and its callers
! * try to approximate the unattainable goal of having the right
! * number of workers activated for the tasks at hand, but must err
! * on the side of too many workers vs too few to avoid stalls:
! *
! * * If computations are purely tree structured, it suffices for
! * every worker to activate another when it pushes a task into
! * an empty queue, resulting in O(log(#threads)) steps to full
! * activation. Emptiness must be conservatively approximated,
! * which may result in unnecessary signals. Also, to reduce
! * resource usages in some cases, at the expense of slower
! * startup in others, activation of an idle thread is preferred
! * over creating a new one, here and elsewhere.
! *
! * * At the other extreme, if "flat" tasks (those that do not in
! * turn generate others) come in serially from only a single
! * producer, each worker taking a task from a queue should
- * propagate a signal if there are more tasks in that
- * queue. This is equivalent to, but generally faster than,
- * arranging the stealer take multiple tasks, re-pushing one or
- * more on its own queue, and signalling (because its queue is
- * empty), also resulting in logarithmic full activation
- * time. If tasks do not not engage in unbounded loops based on
- * the actions of other workers with unknown dependencies loop,
- * this form of proagation can be limited to one signal per
- * activation (phase change). We distinguish the cases by
- * further signalling only if the task is an InterruptibleTask
- * (see below), which are the only supported forms of task that
- * may do so.
- *
- * * Because we don't know about usage patterns (or most commonly,
- * mixtures), we use both approaches, which present even more
- * opportunities to over-signal. (Failure to distinguish these
- * cases in terms of submission methods was arguably an early
- * design mistake.) Note that in either of these contexts,
- * signals may be (and often are) unnecessary because active
- * workers continue scanning after running tasks without the
- * need to be signalled (which is one reason work stealing is
- * often faster than alternatives), so additional workers
- * aren't needed.
- *
- * * For rapidly branching tasks that require full pool resources,
- * oversignalling is OK, because signalWork will soon have no
- * more workers to create or reactivate. But for others (mainly
- * externally submitted tasks), overprovisioning may cause very
- * noticeable slowdowns due to contention and resource
- * wastage. We reduce impact by deactivating workers when
- * queues don't have accessible tasks, but reactivating and
- * rescanning if other tasks remain.
- *
- * * Despite these, signal contention and overhead effects still
- * occur during ramp-up and ramp-down of small computations.
*
* Scanning. Method runWorker performs top-level scanning for (and
* execution of) tasks by polling a pseudo-random permutation of
* the array (by starting at a given index, and using a constant
* cyclically exhaustive stride.) It uses the same basic polling
* method as WorkQueue.poll(), but restarts with a different
! * permutation on each invocation. The pseudorandom generator
! * need not have high-quality statistical properties in the long
* term. We use Marsaglia XorShifts, seeded with the Weyl sequence
! * from ThreadLocalRandom probes, which are cheap and
- * suffice. Each queue's polling attempts to avoid becoming stuck
- * when other scanners/pollers stall. Scans do not otherwise
- * explicitly take into account core affinities, loads, cache
- * localities, etc, However, they do exploit temporal locality
- * (which usually approximates these) by preferring to re-poll
- * from the same queue after a successful poll before trying
- * others, which also reduces bookkeeping, cache traffic, and
- * scanning overhead. But it also reduces fairness, which is
- * partially counteracted by giving up on detected interference
- * (which also reduces contention when too many workers try to
- * take small tasks from the same queue).
*
* Deactivation. When no tasks are found by a worker in runWorker,
! * it tries to deactivate()), giving up (and rescanning) on "ctl"
! * contention. To avoid missed signals during deactivation, the
! * method rescans and reactivates if there may have been a missed
! * signal during deactivation. To reduce false-alarm reactivations
! * while doing so, we scan multiple times (analogously to method
! * quiescent()) before trying to reactivate. Because idle workers
! * are often not yet blocked (parked), we use a WorkQueue field to
! * advertise that a waiter actually needs unparking upon signal.
*
* Quiescence. Workers scan looking for work, giving up when they
* don't find any, without being sure that none are available.
* However, some required functionality relies on consensus about
* quiescence (also termination, discussed below). The count
* required, we reduce write contention by ensuring that
* signalWork invocations are prefaced with a fully fenced memory
* access (which is usually needed anyway).
*
* Signalling. Signals (in signalWork) cause new or reactivated
! * workers to scan for tasks. SignalWork is invoked in two cases:
! * (1) When a task is pushed onto an empty queue, and (2) When a
! * worker takes a top-level task from a queue that has additional
! * tasks. Together, these suffice in O(log(#threads)) steps to
! * fully activate with at least enough workers, and ideally no
! * more than required. This ideal is unobtainable: Callers do not
! * know whether another worker will finish its current task and
! * poll for others without need of a signal (which is otherwise an
! * advantage of work-stealing vs other schemes), and also must
! * conservatively estimate the triggering conditions of emptiness
! * or non-emptiness; all of which usually cause more activations
! * than necessary (see below). (Method signalWork is also used as
! * failsafe in case of Thread failures in deregisterWorker, to
! * activate or create a new worker to replace them).
! *
! * Top-Level scheduling
! * ====================
*
* Scanning. Method runWorker performs top-level scanning for (and
* execution of) tasks by polling a pseudo-random permutation of
* the array (by starting at a given index, and using a constant
* cyclically exhaustive stride.) It uses the same basic polling
* method as WorkQueue.poll(), but restarts with a different
! * permutation on each rescan. The pseudorandom generator need
! * not have high-quality statistical properties in the long
* term. We use Marsaglia XorShifts, seeded with the Weyl sequence
! * from ThreadLocalRandom probes, which are cheap and suffice.
*
* Deactivation. When no tasks are found by a worker in runWorker,
! * it invokes awaitWork, that first deactivates (to an IDLE
! * phase). Avoiding missed signals during deactivation requires a
! * (conservative) rescan, reactivating if there may be tasks to
! * poll. Because idle workers are often not yet blocked (parked),
! * we use a WorkQueue field to advertise that a waiter actually
! * needs unparking upon signal.
! *
! * When tasks are constructed as (recursive) DAGs, top-level
+ * scanning is usually infrequent, and doesn't encounter most
+ * of the following problems addressed by runWorker and awaitWork:
+ *
+ * Locality. Polls are organized into "runs", continuing until
+ * empty or contended, while also minimizing interference by
+ * postponing bookeeping to ends of runs. This may reduce
+ * fairness.
+ *
+ * Contention. When many workers try to poll few queues, they
+ * often collide, generating CAS failures and disrupting locality
+ * of workers already running their tasks. This also leads to
+ * stalls when tasks cannot be taken because other workers have
+ * not finished poll operations, which is detected by reading
+ * ahead in queue arrays. In both cases, workers restart scans in a
+ * way that approximates randomized backoff.
+ *
+ * Oversignalling. When many short top-level tasks are present in
+ * a small number of queues, the above signalling strategy may
+ * activate many more workers than needed, worsening locality and
+ * contention problems, while also generating more global
+ * contention (field ctl is CASed on every activation and
+ * deactivation). We filter out (both in runWorker and
+ * signalWork) attempted signals that are surely not needed
+ * because the signalled tasks are already taken.
+ *
+ * Shutdown and Quiescence
+ * =======================
*
* Quiescence. Workers scan looking for work, giving up when they
* don't find any, without being sure that none are available.
* However, some required functionality relies on consensus about
* quiescence (also termination, discussed below). The count
* InterruptibleTasks include a "runner" field (implemented
* similarly to FutureTask) to support cancel(true). Upon pool
* shutdown, runners are interrupted so they can cancel. Since
* external joining callers never run these tasks, they must await
* cancellation by others, which can occur along several different
! * paths. The inability to rely on caller-runs may also require
- * extra signalling (resulting in scanning and contention) so is
- * done only conditionally in methods push and runworker.
*
* Across these APIs, rules for reporting exceptions for tasks
* with results accessed via join() differ from those via get(),
* which differ from those invoked using pool submit methods by
* non-workers (which comply with Future.get() specs). Internal
* InterruptibleTasks include a "runner" field (implemented
* similarly to FutureTask) to support cancel(true). Upon pool
* shutdown, runners are interrupted so they can cancel. Since
* external joining callers never run these tasks, they must await
* cancellation by others, which can occur along several different
! * paths.
*
* Across these APIs, rules for reporting exceptions for tasks
* with results accessed via join() differ from those via get(),
* which differ from those invoked using pool submit methods by
* non-workers (which comply with Future.get() specs). Internal
* would be, while also reducing total footprint vs using
* multiple @Contended regions, which tends to slow down
* less-contended applications. To help arrange this, some
* non-reference fields are declared as "long" even when ints or
* shorts would suffice. For class WorkQueue, an
! * embedded @Contended region segregates fields most heavily
! * updated by owners from those most commonly read by stealers or
! * other management.
*
* Initial sizing and resizing of WorkQueue arrays is an even more
* delicate tradeoff because the best strategy systematically
* varies across garbage collectors. Small arrays are better for
* locality and reduce GC scan time, but large arrays reduce both
* direct false-sharing and indirect cases due to GC bookkeeping
* (cardmarks etc), and reduce the number of resizes, which are
* not especially fast because they require atomic transfers.
! * Currently, arrays for workers are initialized to be just large
! * enough to avoid resizing in most tree-structured tasks, but
! * larger for external queues where both false-sharing problems
! * and the need for resizing are more common. (Maintenance note:
! * any changes in fields, queues, or their uses, or JVM layout
- * policies, must be accompanied by re-evaluation of these
- * placement and sizing decisions.)
*
* Style notes
* ===========
*
* Memory ordering relies mainly on atomic operations (CAS,
* would be, while also reducing total footprint vs using
* multiple @Contended regions, which tends to slow down
* less-contended applications. To help arrange this, some
* non-reference fields are declared as "long" even when ints or
* shorts would suffice. For class WorkQueue, an
! * embedded @Contended isolates the very busy top index, and
! * another segregates status and bookkeeping fields written
! * (mostly) by owners, that otherwise interfere with reading
+ * array, top, and base fields. There are other variables commonly
+ * contributing to false-sharing-related performance issues
+ * (including fields of class Thread), but we can't do much about
+ * this except try to minimize access.
*
* Initial sizing and resizing of WorkQueue arrays is an even more
* delicate tradeoff because the best strategy systematically
* varies across garbage collectors. Small arrays are better for
* locality and reduce GC scan time, but large arrays reduce both
* direct false-sharing and indirect cases due to GC bookkeeping
* (cardmarks etc), and reduce the number of resizes, which are
* not especially fast because they require atomic transfers.
! * Currently, arrays are initialized to be just large enough to
! * avoid resizing in most tree-structured tasks, but grow rapidly
! * until large. (Maintenance note: any changes in fields, queues,
! * or their uses, or JVM layout policies, must be accompanied by
! * re-evaluation of these placement and sizing decisions.)
*
* Style notes
* ===========
*
* Memory ordering relies mainly on atomic operations (CAS,
* running out of resources needed to do so.
*/
static final int DEFAULT_COMMON_MAX_SPARES = 256;
/**
! * Initial capacity of work-stealing queue array for workers.
* Must be a power of two, at least 2. See above.
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
- /**
- * Initial capacity of work-stealing queue array for external queues.
- * Must be a power of two, at least 2. See above.
- */
- static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
-
// conversions among short, int, long
static final int SMASK = 0xffff; // (unsigned) short bits
static final long LMASK = 0xffffffffL; // lower 32 bits of long
static final long UMASK = ~LMASK; // upper 32 bits
* running out of resources needed to do so.
*/
static final int DEFAULT_COMMON_MAX_SPARES = 256;
/**
! * Initial capacity of work-stealing queue array.
* Must be a power of two, at least 2. See above.
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
// conversions among short, int, long
static final int SMASK = 0xffff; // (unsigned) short bits
static final long LMASK = 0xffffffffL; // lower 32 bits of long
static final long UMASK = ~LMASK; // upper 32 bits
final ForkJoinWorkerThread owner; // null if shared
ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
int base; // index of next slot for poll
final int config; // mode bits
! // fields otherwise causing more unnecessary false-sharing cache misses
- @jdk.internal.vm.annotation.Contended("w")
int top; // index of next slot for push
@jdk.internal.vm.annotation.Contended("w")
volatile int phase; // versioned active status
@jdk.internal.vm.annotation.Contended("w")
int stackPred; // pool stack (ctl) predecessor link
@jdk.internal.vm.annotation.Contended("w")
volatile int source; // source queue id (or DROPPED)
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
- @jdk.internal.vm.annotation.Contended("w")
- volatile int parking; // nonzero if parked in awaitWork
// Support for atomic operations
private static final Unsafe U;
private static final long PHASE;
private static final long BASE;
final ForkJoinWorkerThread owner; // null if shared
ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
int base; // index of next slot for poll
final int config; // mode bits
! @jdk.internal.vm.annotation.Contended("t") // segregate
int top; // index of next slot for push
+
+ // fields otherwise causing more unnecessary false-sharing cache misses
@jdk.internal.vm.annotation.Contended("w")
volatile int phase; // versioned active status
@jdk.internal.vm.annotation.Contended("w")
int stackPred; // pool stack (ctl) predecessor link
@jdk.internal.vm.annotation.Contended("w")
+ volatile int parking; // nonzero if parked in awaitWork
+ @jdk.internal.vm.annotation.Contended("w")
volatile int source; // source queue id (or DROPPED)
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
// Support for atomic operations
private static final Unsafe U;
private static final long PHASE;
private static final long BASE;
* Constructor. For internal queues, most fields are initialized
* upon thread start in pool.registerWorker.
*/
WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
boolean clearThreadLocals) {
- array = new ForkJoinTask<?>[owner == null ?
- INITIAL_EXTERNAL_QUEUE_CAPACITY :
- INITIAL_QUEUE_CAPACITY];
- this.owner = owner;
this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
}
/**
* Returns an exportable index (used by ForkJoinWorkerThread).
*/
* Constructor. For internal queues, most fields are initialized
* upon thread start in pool.registerWorker.
*/
WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
boolean clearThreadLocals) {
this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
+ if ((this.owner = owner) == null) {
+ array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
+ phase = id | IDLE;
+ }
}
/**
* Returns an exportable index (used by ForkJoinWorkerThread).
*/
* @param internal if caller owns this queue
* @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
! if ((a = array) != null && (cap = a.length) > 0 && // else disabled
- task != null) {
- int pk = task.noUserHelp() + 1; // prev slot offset
if ((room = (m = cap - 1) - (s - b)) >= 0) {
top = s + 1;
long pos = slotOffset(m & s);
if (!internal)
U.putReference(a, pos, task); // inside lock
else
U.getAndSetReference(a, pos, task); // fully fenced
! if (room == 0) // resize
! growArray(a, cap, s);
}
if (!internal)
unlockPhase();
if (room < 0)
throw new RejectedExecutionException("Queue capacity exceeded");
! if ((room == 0 || a[m & (s - pk)] == null) &&
! pool != null)
! pool.signalWork(); // may have appeared empty
}
}
/**
* Resizes the queue array unless out of memory.
* @param a old array
* @param cap old array capacity
* @param s current top
*/
! private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
! int newCap = cap << 1;
if (a != null && a.length == cap && cap > 0 && newCap > 0) {
- ForkJoinTask<?>[] newArray = null;
try {
newArray = new ForkJoinTask<?>[newCap];
} catch (OutOfMemoryError ex) {
}
if (newArray != null) { // else throw on next push
* @param internal if caller owns this queue
* @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
! if ((a = array) != null && (cap = a.length) > 0) { // else disabled
if ((room = (m = cap - 1) - (s - b)) >= 0) {
top = s + 1;
long pos = slotOffset(m & s);
if (!internal)
U.putReference(a, pos, task); // inside lock
else
U.getAndSetReference(a, pos, task); // fully fenced
! if (room == 0 && (a = growArray(a, cap, s)) != null)
! m = a.length - 1; // resize
}
if (!internal)
unlockPhase();
if (room < 0)
throw new RejectedExecutionException("Queue capacity exceeded");
! if (pool != null && a != null &&
! U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null)
! pool.signalWork(a, m & s); // may have appeared empty
}
}
/**
* Resizes the queue array unless out of memory.
* @param a old array
* @param cap old array capacity
* @param s current top
+ * @return new array, or null on failure
*/
! private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
! int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
+ ForkJoinTask<?>[] newArray = null;
if (a != null && a.length == cap && cap > 0 && newCap > 0) {
try {
newArray = new ForkJoinTask<?>[newCap];
} catch (OutOfMemoryError ex) {
}
if (newArray != null) { // else throw on next push
newArray[k & newMask] = u;
}
updateArray(newArray); // fully fenced
}
}
}
/**
! * Takes next task, if one exists, in order specified by mode,
- * so acts as either local-pop or local-poll. Called only by owner.
- * @param fifo nonzero if FIFO mode
*/
! private ForkJoinTask<?> nextLocalTask(int fifo) {
ForkJoinTask<?> t = null;
! ForkJoinTask<?>[] a = array;
! int b = base, p = top, cap;
! if (p - b > 0 && a != null && (cap = a.length) > 0) {
! for (int m = cap - 1, s, nb;;) {
! if (fifo == 0 || (nb = b + 1) == p) {
! if ((t = (ForkJoinTask<?>)U.getAndSetReference(
! a, slotOffset(m & (s = p - 1)), null)) != null)
! updateTop(s); // else lost race for only task
! break;
}
! if ((t = (ForkJoinTask<?>)U.getAndSetReference(
! a, slotOffset(m & b), null)) != null) {
updateBase(nb);
break;
}
! while (b == (b = U.getIntAcquire(this, BASE)))
! Thread.onSpinWait(); // spin to reduce memory traffic
- if (p - b <= 0)
- break;
}
}
return t;
}
/**
* Takes next task, if one exists, using configured mode.
- * (Always internal, never called for Common pool.)
*/
final ForkJoinTask<?> nextLocalTask() {
! return nextLocalTask(config & FIFO);
}
/**
* Pops the given task only if it is at the current top.
* @param task the task. Caller must ensure non-null.
newArray[k & newMask] = u;
}
updateArray(newArray); // fully fenced
}
}
+ return newArray;
}
/**
! * Takes next task, if one exists, in lifo order.
*/
! private ForkJoinTask<?> localPop() {
ForkJoinTask<?> t = null;
! int s = top - 1, cap; long k; ForkJoinTask<?>[] a;
! if ((a = array) != null && (cap = a.length) > 0 &&
! U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
! (t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
! updateTop(s);
! return t;
! }
!
! /**
+ * Takes next task, if one exists, in fifo order.
+ */
+ private ForkJoinTask<?> localPoll() {
+ ForkJoinTask<?> t = null;
+ int p = top, cap; ForkJoinTask<?>[] a;
+ if ((a = array) != null && (cap = a.length) > 0) {
+ for (int b = base; p - b > 0; ) {
+ int nb = b + 1;
+ long k = slotOffset((cap - 1) & b);
+ if (U.getReference(a, k) == null) {
+ if (nb == p)
+ break; // else base is lagging
+ while (b == (b = U.getIntAcquire(this, BASE)))
+ Thread.onSpinWait(); // spin to reduce memory traffic
}
! else if ((t = (ForkJoinTask<?>)
! U.getAndSetReference(a, k, null)) != null) {
updateBase(nb);
break;
}
! else
! b = base;
}
}
return t;
}
/**
* Takes next task, if one exists, using configured mode.
*/
final ForkJoinTask<?> nextLocalTask() {
! return (config & FIFO) == 0 ? localPop() : localPoll();
}
/**
* Pops the given task only if it is at the current top.
* @param task the task. Caller must ensure non-null.
* Runs the given task, as well as remaining local tasks.
*/
final void topLevelExec(ForkJoinTask<?> task, int fifo) {
while (task != null) {
task.doExec();
! task = nextLocalTask(fifo);
}
}
/**
* Deep form of tryUnpush: Traverses from top and removes and
* Runs the given task, as well as remaining local tasks.
*/
final void topLevelExec(ForkJoinTask<?> task, int fifo) {
while (task != null) {
task.doExec();
! task = (fifo == 0) ? localPop() : localPoll();
}
}
/**
* Deep form of tryUnpush: Traverses from top and removes and
/**
* Cancels all local tasks. Called only by owner.
*/
final void cancelTasks() {
! for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) {
try {
t.cancel(false);
} catch (Throwable ignore) {
}
}
/**
* Cancels all local tasks. Called only by owner.
*/
final void cancelTasks() {
! for (ForkJoinTask<?> t; (t = localPop()) != null; ) {
try {
t.cancel(false);
} catch (Throwable ignore) {
}
}
* Finishes initializing and records internal queue.
*
* @param w caller's WorkQueue
*/
final void registerWorker(WorkQueue w) {
! if (w != null && (runState & STOP) == 0L) {
ThreadLocalRandom.localInit();
int seed = w.stackPred = ThreadLocalRandom.getProbe();
int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
long stop = lockRunState() & STOP;
* Finishes initializing and records internal queue.
*
* @param w caller's WorkQueue
*/
final void registerWorker(WorkQueue w) {
! if (w != null) {
+ w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
ThreadLocalRandom.localInit();
int seed = w.stackPred = ThreadLocalRandom.getProbe();
int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
long stop = lockRunState() & STOP;
unlockRunState();
}
}
if ((tryTerminate(false, false) & STOP) == 0L &&
phase != 0 && w != null && w.source != DROPPED) {
- signalWork(); // possibly replace
w.cancelTasks(); // clean queue
}
if (ex != null)
ForkJoinTask.rethrow(ex);
}
/**
! * Releases an idle worker, or creates one if not enough exist.
*/
! final void signalWork() {
int pc = parallelism;
for (long c = ctl;;) {
WorkQueue[] qs = queues;
long ac = (c + RC_UNIT) & RC_MASK, nc;
int sp = (int)c, i = sp & SMASK;
unlockRunState();
}
}
if ((tryTerminate(false, false) & STOP) == 0L &&
phase != 0 && w != null && w.source != DROPPED) {
w.cancelTasks(); // clean queue
+ signalWork(null, 0); // possibly replace
}
if (ex != null)
ForkJoinTask.rethrow(ex);
}
/**
! * Releases an idle worker, or creates one if not enough exist,
+ * giving up if array a is nonnull and task at a[k] already taken.
*/
! final void signalWork(ForkJoinTask<?>[] a, int k) {
int pc = parallelism;
for (long c = ctl;;) {
WorkQueue[] qs = queues;
long ac = (c + RC_UNIT) & RC_MASK, nc;
int sp = (int)c, i = sp & SMASK;
break;
WorkQueue w = qs[i], v = null;
if (sp == 0) {
if ((short)(c >>> TC_SHIFT) >= pc)
break;
! nc = ((c + TC_UNIT) & TC_MASK);
}
else if ((v = w) == null)
break;
else
! nc = (v.stackPred & LMASK) | (c & TC_MASK);
! if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
if (v == null)
createWorker();
else {
v.phase = sp;
if (v.parking != 0)
break;
WorkQueue w = qs[i], v = null;
if (sp == 0) {
if ((short)(c >>> TC_SHIFT) >= pc)
break;
! nc = ((c + TC_UNIT) & TC_MASK) | ac;
}
else if ((v = w) == null)
break;
else
! nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
! if (a != null && k < a.length && k >= 0 && a[k] == null)
+ break;
+ if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
if (v == null)
createWorker();
else {
v.phase = sp;
if (v.parking != 0)
*
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
if (w != null) {
! int phase = w.phase, r = w.stackPred; // seed from registerWorker
! int fifo = w.config & FIFO, nsteals = 0, src = -1;
! for (;;) {
! WorkQueue[] qs;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
! if ((runState & STOP) != 0L || (qs = queues) == null)
! break;
! int n = qs.length, i = r, step = (r >>> 16) | 1;
! boolean rescan = false;
! scan: for (int l = n; l > 0; --l, i += step) { // scan queues
! int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
! if ((q = qs[j = i & (n - 1)]) != null &&
! (a = q.array) != null && (cap = a.length) > 0) {
! for (int m = cap - 1, pb = -1, b = q.base;;) {
! ForkJoinTask<?> t; long k;
! t = (ForkJoinTask<?>)U.getReferenceAcquire(
! a, k = slotOffset(m & b));
! if (b != (b = q.base) || t == null ||
! !U.compareAndSetReference(a, k, t, null)) {
! if (a[b & m] == null) {
! if (rescan) // end of run
! break scan;
! if (a[(b + 1) & m] == null &&
! a[(b + 2) & m] == null) {
! break; // probably empty
! }
! if (pb == (pb = b)) { // track progress
- rescan = true; // stalled; reorder scan
- break scan;
- }
}
}
! else {
! boolean propagate;
! int nb = q.base = b + 1, prevSrc = src;
! w.nsteals = ++nsteals;
! w.source = src = j; // volatile
! rescan = true;
! int nh = t.noUserHelp();
! if (propagate =
! (prevSrc != src || nh != 0) && a[nb & m] != null)
! signalWork();
! w.topLevelExec(t, fifo);
! if ((b = q.base) != nb && !propagate)
- break scan; // reduce interference
}
}
}
}
- if (!rescan) {
- if (((phase = deactivate(w, phase)) & IDLE) != 0)
- break;
- src = -1; // re-enable propagation
- }
}
}
}
/**
! * Deactivates and if necessary awaits signal or termination.
*
! * @param w the worker
! * @param phase current phase
- * @return current phase, with IDLE set if worker should exit
*/
! private int deactivate(WorkQueue w, int phase) {
! if (w == null) // currently impossible
! return IDLE;
! int p = phase | IDLE, activePhase = phase + (IDLE << 1);
! long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
! int sp = w.stackPred = (int)pc; // set ctl stack link
! w.phase = p;
! if (!compareAndSetCtl(pc, qc)) // try to enqueue
! return w.phase = phase; // back out on possible signal
! int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
! if (((e = runState) & STOP) != 0L ||
! ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
! (qs = queues) == null || (n = qs.length) <= 0)
! return IDLE; // terminating
!
! for (int prechecks = Math.min(ac, 2), // reactivation threshold
! k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
! WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
! if (w.phase == activePhase)
! return activePhase;
! if (--k < 0)
! return awaitWork(w, p); // block, drop, or exit
! if ((q = qs[k & (n - 1)]) == null)
! Thread.onSpinWait();
! else if ((a = q.array) != null && (cap = a.length) > 0 &&
! a[q.base & (cap - 1)] != null && --prechecks < 0 &&
! (int)(c = ctl) == activePhase &&
! compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
! return w.phase = activePhase; // reactivate
}
}
/**
* Awaits signal or termination.
*
* @param w the work queue
! * @param p current phase (known to be idle)
! * @return current phase, with IDLE set if worker should exit
*/
! private int awaitWork(WorkQueue w, int p) {
! if (w != null) {
! ForkJoinWorkerThread t; long deadline;
! if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
! t.resetThreadLocals(); // clear before reactivate
! if ((ctl & RC_MASK) > 0L)
deadline = 0L;
! else if ((deadline =
! (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
! System.currentTimeMillis()) == 0L)
! deadline = 1L; // avoid zero
! int activePhase = p + IDLE;
! if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
! LockSupport.setCurrentBlocker(this);
! w.parking = 1; // enable unpark
! while ((p = w.phase) != activePhase) {
! boolean trimmable = false; int trim;
! Thread.interrupted(); // clear status
! if ((runState & STOP) != 0L)
break;
! if (deadline != 0L) {
! if ((trim = tryTrim(w, p, deadline)) > 0)
- break;
- else if (trim < 0)
- deadline = 0L;
- else
- trimmable = true;
- }
- U.park(trimmable, deadline);
}
! w.parking = 0;
! LockSupport.setCurrentBlocker(null);
}
}
! return p;
}
/**
* Tries to remove and deregister worker after timeout, and release
* another to do the same.
* @return > 0: trimmed, < 0 : not trimmable, else 0
*/
! private int tryTrim(WorkQueue w, int phase, long deadline) {
! long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
! if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
stat = -1; // no longer ctl top
! else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
stat = 0; // spurious wakeup
else if (!compareAndSetCtl(
c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
(TC_MASK & (c - TC_UNIT)))))
stat = -1; // lost race to signaller
*
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
if (w != null) {
! int r = w.stackPred; // seed from registerWorker
! int fifo = (int)config & FIFO;
! int nsteals = 0; // shadow w.nsteals
! boolean rescan = true;
+ WorkQueue[] qs; int n;
+ while ((rescan || deactivate(w) == 0) && (runState & STOP) == 0L &&
+ (qs = queues) != null && (n = qs.length) > 0) {
+ rescan = false;
+ int i = r, step = (r >>> 16) | 1;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
! scan: for (int j = n << 1; j != 0; --j, i += step) { // 2 sweeps
! WorkQueue q; int qid;
! if ((q = qs[qid = i & (n - 1)]) != null) {
! for (;;) { // poll queue q
! ForkJoinTask<?>[] a; int cap, b, m, nb, nk;
! if ((a = q.array) == null || (cap = a.length) <= 0)
! break;
! long bp = slotOffset((m = cap - 1) & (b = q.base));
! long np = slotOffset(nk = m & (nb = b + 1));
! ForkJoinTask<?> t = (ForkJoinTask<?>)
! U.getReferenceAcquire(a, bp);
! if (q.array != a || q.base != b ||
! U.getReference(a, bp) != t)
! continue; // inconsistent
! if (t == null) {
! if (rescan) { // end of run
! w.nsteals = nsteals;
! break scan;
! }
! if (U.getReference(a, np) != null) {
! rescan = true; // stalled; reorder scan
! break scan;
}
+ break; // probably empty
}
! if (U.compareAndSetReference(a, bp, t, null)) {
! q.base = nb;
! Object nt = U.getReferenceAcquire(a, np);
! if (!rescan) { // begin run
! rescan = true;
! w.source = qid;
! }
! ++nsteals;
! if (nt != null && // confirm a[nk]
! U.getReference(a, np) == nt)
! signalWork(a, nk); // propagate
! w.topLevelExec(t, fifo); // run t & its subtasks
}
}
}
}
}
}
}
/**
! * Deactivates and awaits signal or termination.
*
! * @param w the work queue
! * @return zero if now active
*/
! private int deactivate(WorkQueue w) {
! int idle = 1;
! if (w != null) { // always true; hoist checks
! int inactive = w.phase |= IDLE; // set status
! int activePhase = inactive + IDLE; // phase value when reactivated
! long ap = activePhase & LMASK, pc = ctl, qc;
! do { // enqueue
! qc = ap | ((pc - RC_UNIT) & UMASK);
! w.stackPred = (int)pc; // set ctl stack link
! } while (pc != (pc = compareAndExchangeCtl(pc, qc)));
!
! WorkQueue[] qs; int n; long e;
! if (((e = runState) & STOP) == 0 && // quiescence checks
! ((e & SHUTDOWN) == 0L || (qc & RC_MASK) > 0L || quiescent() <= 0) &&
! (qs = queues) != null && (n = qs.length) > 1) {
! long psp = pc & LMASK; // ctl predecessor prefix
! for (int i = 1; i < n; ++i) { // scan; stagger origins
! WorkQueue q; long c; // missed signal check
! if ((q = qs[(activePhase + i) & (n - 1)]) != null &&
! q.top - q.base > 0) {
! if ((idle = w.phase - activePhase) != 0 &&
! (int)(c = ctl) == activePhase &&
! compareAndSetCtl(c, psp | ((c + RC_UNIT) & UMASK))) {
! w.phase = activePhase;
! idle = 0; // reactivated
! } // else ineligible or lost race
! break;
! }
! }
+ if (idle != 0 && (idle = w.phase - activePhase) != 0)
+ idle = awaitWork(w, activePhase, n);
+ }
}
+ return idle;
}
/**
* Awaits signal or termination.
*
* @param w the work queue
! * @param activePhase w's next active phase
! * @param qsize current size of queues array
+ * @return zero if now active
*/
! private int awaitWork(WorkQueue w, int activePhase, int qsize) {
! int idle = 1;
! int spins = qsize | (qsize - 1); // approx traversal cost
! if (w != null) { // always true; hoist checks
! boolean trimmable; long deadline, c;
! long trimTime = (w.source == INVALID_ID) ? TIMEOUT_SLOP : keepAlive;
+ if ((w.config & CLEAR_TLS) != 0 && // instanceof check always true
+ Thread.currentThread() instanceof ForkJoinWorkerThread f)
+ f.resetThreadLocals(); // clear while accessing thread state
+ LockSupport.setCurrentBlocker(this);
+ if (trimmable = (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase))
+ deadline = trimTime + System.currentTimeMillis();
+ else
deadline = 0L;
! for (;;) {
! int s = spins, trim;
! Thread.interrupted(); // clear status
! if ((runState & STOP) != 0L)
! break;
! while ((idle = w.phase - activePhase) != 0 && --s != 0)
! Thread.onSpinWait(); // spin before blocking
! if (idle == 0)
! break;
! if (trimmable &&
! (trim = tryTrim(w, activePhase, deadline)) != 0) {
! if (trim > 0)
break;
! trimmable = false;
! deadline = 0L;
}
! w.parking = 1; // enable unpark and recheck
! if ((idle = w.phase - activePhase) != 0)
+ U.park(trimmable, deadline);
+ w.parking = 0; // close unpark window
+ if (idle == 0 || (idle = w.phase - activePhase) == 0)
+ break;
}
+ LockSupport.setCurrentBlocker(null);
}
! return idle;
}
/**
* Tries to remove and deregister worker after timeout, and release
* another to do the same.
* @return > 0: trimmed, < 0 : not trimmable, else 0
*/
! private int tryTrim(WorkQueue w, int activePhase, long deadline) {
! long c, nc; int stat, vp, i; WorkQueue[] vs; WorkQueue v;
! long waitTime = deadline - System.currentTimeMillis();
+ if ((int)(c = ctl) != activePhase || w == null)
stat = -1; // no longer ctl top
! else if (waitTime > TIMEOUT_SLOP)
stat = 0; // spurious wakeup
else if (!compareAndSetCtl(
c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
(TC_MASK & (c - TC_UNIT)))))
stat = -1; // lost race to signaller
// External operations
/**
* Finds and locks a WorkQueue for an external submitter, or
! * throws RejectedExecutionException if shutdown or terminating.
- * @param r current ThreadLocalRandom.getProbe() value
* @param rejectOnShutdown true if RejectedExecutionException
! * should be thrown when shutdown (else only if terminating)
*/
! private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
! int reuse; // nonzero if prefer create
! if ((reuse = r) == 0) {
! ThreadLocalRandom.localInit(); // initialize caller's probe
r = ThreadLocalRandom.getProbe();
}
! for (int probes = 0; ; ++probes) {
! int n, i, id; WorkQueue[] qs; WorkQueue q;
! if ((qs = queues) == null)
- break;
- if ((n = qs.length) <= 0)
break;
if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
! WorkQueue w = new WorkQueue(null, id, 0, false);
! w.phase = id;
! boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
! rejectOnShutdown);
- if (!reject && queues == qs && qs[i] == null)
- q = qs[i] = w; // else lost race to install
unlockRunState();
- if (q != null)
- return q;
- if (reject)
- break;
- reuse = 0;
}
! if (reuse == 0 || !q.tryLockPhase()) { // move index
! if (reuse == 0) {
! if (probes >= n >> 1)
! reuse = r; // stop prefering free slot
}
- else if (q != null)
- reuse = 0; // probe on collision
- r = ThreadLocalRandom.advanceProbe(r);
- }
- else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
- q.unlockPhase(); // check while q lock held
- break;
- }
- else
return q;
}
throw new RejectedExecutionException();
}
private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
// External operations
/**
* Finds and locks a WorkQueue for an external submitter, or
! * throws RejectedExecutionException if shutdown
* @param rejectOnShutdown true if RejectedExecutionException
! * should be thrown when shutdown
*/
! final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
! int r;
! if ((r = ThreadLocalRandom.getProbe()) == 0) {
! ThreadLocalRandom.localInit(); // initialize caller's probe
r = ThreadLocalRandom.getProbe();
}
! for (;;) {
! WorkQueue q; WorkQueue[] qs; int n, id, i;
! if ((qs = queues) == null || (n = qs.length) <= 0)
break;
if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
! WorkQueue newq = new WorkQueue(null, id, 0, false);
! lockRunState();
! if (qs[i] == null && queues == qs)
! q = qs[i] = newq; // else lost race to install
unlockRunState();
}
! if (q != null && q.tryLockPhase()) {
! if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
! q.unlockPhase(); // check while q lock held
! break;
}
return q;
+ }
+ r = ThreadLocalRandom.advanceProbe(r); // move
}
throw new RejectedExecutionException();
}
private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
internal = true;
q = wt.workQueue;
}
else { // find and lock queue
internal = false;
! q = submissionQueue(ThreadLocalRandom.getProbe(), true);
}
q.push(task, signalIfEmpty ? this : null, internal);
return task;
}
- /**
- * Returns queue for an external submission, bypassing call to
- * submissionQueue if already established and unlocked.
- */
- final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
- WorkQueue[] qs; WorkQueue q; int n;
- int r = ThreadLocalRandom.getProbe();
- return (((qs = queues) != null && (n = qs.length) > 0 &&
- (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
- q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
- }
-
/**
* Returns queue for an external thread, if one exists that has
* possibly ever submitted to the given pool (nonzero probe), or
* null if none.
*/
internal = true;
q = wt.workQueue;
}
else { // find and lock queue
internal = false;
! q = externalSubmissionQueue(true);
}
q.push(task, signalIfEmpty ? this : null, internal);
return task;
}
/**
* Returns queue for an external thread, if one exists that has
* possibly ever submitted to the given pool (nonzero probe), or
* null if none.
*/
< prev index next >