< prev index next > src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Print this page
* illustrates some of the implementation decisions in this class.
*
* * Slot k must be read with an acquiring read, which it must
* anyway to dereference and run the task if the (acquiring)
* CAS succeeds, but uses an explicit acquire fence to support
! * the following rechecks even if the CAS is not attempted. To
- * more easily distinguish among kinds of CAS failures, we use
- * the compareAndExchange version, and usually handle null
- * returns (indicating contention) separately from others.
*
* * q.base may change between reading and using its value to
* index the slot. To avoid trying to use the wrong t, the
* index and slot must be reread (not necessarily immediately)
* until consistent, unless this is a local poll by owner, in
* illustrates some of the implementation decisions in this class.
*
* * Slot k must be read with an acquiring read, which it must
* anyway to dereference and run the task if the (acquiring)
* CAS succeeds, but uses an explicit acquire fence to support
! * the following rechecks even if the CAS is not attempted.
*
* * q.base may change between reading and using its value to
* index the slot. To avoid trying to use the wrong t, the
* index and slot must be reread (not necessarily immediately)
* until consistent, unless this is a local poll by owner, in
* opportunities to over-signal. Note that in either of these
* contexts, signals may be (and often are) unnecessary because
* active workers continue scanning after running tasks without
* the need to be signalled (which is one reason work stealing
* is often faster than alternatives), so additional workers
! * aren't needed. We filter out some of these cases by exiting
- * retry loops in signalWork if the task responsible for the
- * signal has already been taken.
*
* * For rapidly branching tasks that require full pool resources,
* oversignalling is OK, because signalWork will soon have no
* more workers to create or reactivate. But for others (mainly
* externally submitted tasks), overprovisioning may cause very
* noticeable slowdowns due to contention and resource
! * wastage. All remedies are intrinsically heuristic. We use a
! * strategy that works well in most cases: We track "sources"
! * (queue ids) of non-empty (usually polled) queues while
- * scanning. These are maintained in the "source" field of
- * WorkQueues for use in method helpJoin and elsewhere (see
- * below). We also maintain them as arguments/results of
- * top-level polls (argument "window" in method scan, with setup
- * in method runWorker) as an encoded sliding window of current
- * and previous two sources (or INVALID_ID if none), and stop
- * signalling when all were from the same source. These
- * mechanisms may result in transiently too few workers, but
- * once workers poll from a new source, they rapidly reactivate
- * others.
*
* * Despite these, signal contention and overhead effects still
* occur during ramp-up and ramp-down of small computations.
*
* Scanning. Method scan performs top-level scanning for (and
* opportunities to over-signal. Note that in either of these
* contexts, signals may be (and often are) unnecessary because
* active workers continue scanning after running tasks without
* the need to be signalled (which is one reason work stealing
* is often faster than alternatives), so additional workers
! * aren't needed.
*
* * For rapidly branching tasks that require full pool resources,
* oversignalling is OK, because signalWork will soon have no
* more workers to create or reactivate. But for others (mainly
* externally submitted tasks), overprovisioning may cause very
* noticeable slowdowns due to contention and resource
! * wastage. We reduce impact by deactivating workers when
! * queues don't have accessible tasks, but reactivating and
! * rescanning if other tasks remain.
*
* * Despite these, signal contention and overhead effects still
* occur during ramp-up and ramp-down of small computations.
*
* Scanning. Method scan performs top-level scanning for (and
* method as WorkQueue.poll(), but restarts with a different
* permutation on each invocation. The pseudorandom generator
* need not have high-quality statistical properties in the long
* term. We use Marsaglia XorShifts, seeded with the Weyl sequence
* from ThreadLocalRandom probes, which are cheap and
! * suffice. Scans do not otherwise explicitly take into account
! * core affinities, loads, cache localities, etc, However, they do
! * exploit temporal locality (which usually approximates these) by
! * preferring to re-poll from the same queue (either in method
! * tryPoll() or scan) after a successful poll before trying others
! * (see method topLevelExec), which also reduces bookkeeping,
! * cache traffic, and scanning overhead. But it also reduces
! * fairness, which is partially counteracted by giving up on
! * contention.
*
* Deactivation. When method scan indicates that no tasks are
! * found by a worker, it tries to deactivate (in awaitWork),
! * giving up (and rescanning) on ctl contention. To avoid missed
! * signals during deactivation, the method rescans and reactivates
! * if there may have been a missed signal during deactivation,
! * filtering out most cases in which this is unnecessary. Because
! * idle workers are often not yet blocked (parked), we use the
! * WorkQueue parking field to advertise that a waiter actually
! * needs unparking upon signal.
*
* Quiescence. Workers scan looking for work, giving up when they
* don't find any, without being sure that none are available.
* However, some required functionality relies on consensus about
* quiescence (also termination, discussed below). The count
* method as WorkQueue.poll(), but restarts with a different
* permutation on each invocation. The pseudorandom generator
* need not have high-quality statistical properties in the long
* term. We use Marsaglia XorShifts, seeded with the Weyl sequence
* from ThreadLocalRandom probes, which are cheap and
! * suffice. Each queue's polling attempt uses a bounded retry
! * (MAX_SCAN_RETRIES) to avoid becoming stuck when other
! * scanners/pollers stall. Scans do not otherwise explicitly take
! * into account core affinities, loads, cache localities, etc,
! * However, they do exploit temporal locality (which usually
! * approximates these) by preferring to re-poll from the same
! * queue after a successful poll before trying others, which also
! * reduces bookkeeping, cache traffic, and scanning overhead. But
! * it also reduces fairness, which is partially counteracted by
+ * giving up on contention.
*
* Deactivation. When method scan indicates that no tasks are
! * found by a worker, it tries to deactivate(), giving up (and
! * rescanning) on ctl contention. To avoid missed signals during
! * deactivation, the method rescans and reactivates if there may
! * have been a missed signal during deactivation, filtering out
! * most cases in which this is unnecessary. Because idle workers
! * are often not yet blocked (parked), we use the WorkQueue
! * parking field to advertise that a waiter actually needs
! * unparking upon signal.
*
* Quiescence. Workers scan looking for work, giving up when they
* don't find any, without being sure that none are available.
* However, some required functionality relies on consensus about
* quiescence (also termination, discussed below). The count
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
* time out and terminate if the pool has remained quiescent for
* period given by field keepAlive (default 60sec), which applies
! * to the first timeout of a fully populated pool. Subsequent (or
! * other) cases use delays such that, if still quiescent, all will
! * be released before one additional keepAlive unit elapses.
*
* Joining Tasks
* =============
*
* The "Join" part of ForkJoinPools consists of a set of
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
* time out and terminate if the pool has remained quiescent for
* period given by field keepAlive (default 60sec), which applies
! * to the first timeout of a quiescent pool. Subsequent cases use
! * minimal delays such that, if still quiescent, all will be
! * released soon therafter.
*
* Joining Tasks
* =============
*
* The "Join" part of ForkJoinPools consists of a set of
* arrays are better for locality and reduce GC scan time, but
* large arrays reduce both direct false-sharing and indirect
* cases due to GC bookkeeping (cardmarks etc), and reduce the
* number of resizes, which are not especially fast because they
* require atomic transfers. Currently, arrays are initialized to
! * be fairly small. (Maintenance note: any changes in fields,
! * queues, or their uses, or JVM layout policies, must be
! * accompanied by re-evaluation of these placement and sizing
! * decisions.)
*
* Style notes
* ===========
*
* Memory ordering relies mainly on atomic operations (CAS,
* arrays are better for locality and reduce GC scan time, but
* large arrays reduce both direct false-sharing and indirect
* cases due to GC bookkeeping (cardmarks etc), and reduce the
* number of resizes, which are not especially fast because they
* require atomic transfers. Currently, arrays are initialized to
! * be fairly small but early resizes rapidly increase size by more
! * than a factor of two until very large. (Maintenance note: any
! * changes in fields, queues, or their uses, or JVM layout
! * policies, must be accompanied by re-evaluation of these
+ * placement and sizing decisions.)
*
* Style notes
* ===========
*
* Memory ordering relies mainly on atomic operations (CAS,
* (5) Internal control methods
* (6) Callbacks and other support for ForkJoinTask methods
* (7) Exported methods
* (8) Static block initializing statics in minimally dependent order
*
- * Revision notes
- * ==============
- *
- * The main sources of differences from previous version are:
- *
- * * New abstract class ForkJoinTask.InterruptibleTask ensures
- * handling of tasks submitted under the ExecutorService
- * API are consistent with specs.
- * * Method quiescent() replaces previous quiescence-related
- * checks, relying on versioning and sequence locking instead
- * of ReentrantLock.
- * * Termination processing now ensures that internal data
- * structures are maintained consistently enough while stopping
- * to interrupt all workers and cancel all tasks. It also uses a
- * CountDownLatch instead of a Condition for termination because
- * of lock change.
- * * Many other changes to avoid performance regressions due
- * to the above.
*/
// static configuration constants
/**
* to park waiting for new work before terminating.
*/
static final long DEFAULT_KEEPALIVE = 60_000L;
/**
! * Undershoot tolerance for idle timeouts
*/
static final long TIMEOUT_SLOP = 20L;
/**
* The default value for common pool maxSpares. Overridable using
* to park waiting for new work before terminating.
*/
static final long DEFAULT_KEEPALIVE = 60_000L;
/**
! * Undershoot tolerance for idle timeouts, also serving as the
+ * minimum allowed timeout value.
*/
static final long TIMEOUT_SLOP = 20L;
/**
* The default value for common pool maxSpares. Overridable using
// {pool, workQueue} config bits
static final int FIFO = 1 << 0; // fifo queue or access mode
static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers
static final int PRESET_SIZE = 1 << 2; // size was set by property
- // source history window packing used in scan() and runWorker()
- static final long RESCAN = 1L << 63; // must be negative
- static final long HMASK = ((((long)SMASK) << 32) |
- (((long)SMASK) << 16)); // history bits
- static final long NO_HISTORY = ((((long)INVALID_ID) << 32) | // no 3rd
- (((long)INVALID_ID) << 16)); // no 2nd
-
// others
static final int DEREGISTERED = 1 << 31; // worker terminating
static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
static final int IDLE = 1 << 16; // phase seqlock/version count
/*
* Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
* RC: Number of released (unqueued) workers
* TC: Number of total workers
// {pool, workQueue} config bits
static final int FIFO = 1 << 0; // fifo queue or access mode
static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers
static final int PRESET_SIZE = 1 << 2; // size was set by property
// others
static final int DEREGISTERED = 1 << 31; // worker terminating
static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
static final int IDLE = 1 << 16; // phase seqlock/version count
+ static final int MAX_SCAN_RETRIES = 8; // scan retry limit
/*
* Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
* RC: Number of released (unqueued) workers
* TC: Number of total workers
// Support for atomic operations
private static final Unsafe U;
private static final long PHASE;
private static final long BASE;
private static final long TOP;
- private static final long SOURCE;
private static final long ARRAY;
final void updateBase(int v) {
U.putIntVolatile(this, BASE, v);
}
final void updateTop(int v) {
U.putIntOpaque(this, TOP, v);
}
- final void forgetSource() {
- U.putIntOpaque(this, SOURCE, 0);
- }
final void updateArray(ForkJoinTask<?>[] a) {
U.getAndSetReference(this, ARRAY, a);
}
final void unlockPhase() {
U.getAndAddInt(this, PHASE, IDLE);
* @param internal if caller owns this queue
* @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool,
boolean internal) {
! int s = top, b = base, cap, m, p, room, newCap; ForkJoinTask<?>[] a;
! if ((a = array) == null || (cap = a.length) <= 0 ||
! (room = (m = cap - 1) - (s - b)) < 0) { // could not resize
if (!internal)
unlockPhase();
! throw new RejectedExecutionException("Queue capacity exceeded");
}
! top = s + 1;
! long pos = slotOffset(p = m & s);
! if (!internal)
! U.putReference(a, pos, task); // inside lock
! else
! U.getAndSetReference(a, pos, task); // fully fenced
! if (room == 0 && (newCap = cap << 1) > 0) {
ForkJoinTask<?>[] newArray = null;
! try { // resize for next time
newArray = new ForkJoinTask<?>[newCap];
} catch (OutOfMemoryError ex) {
}
if (newArray != null) { // else throw on next push
! int newMask = newCap - 1; // poll old, push to new
for (int k = s, j = cap; j > 0; --j, --k) {
! ForkJoinTask<?> u;
if ((u = (ForkJoinTask<?>)U.getAndSetReference(
! a, slotOffset(k & m), null)) == null)
break; // lost to pollers
newArray[k & newMask] = u;
}
! updateArray(newArray); // fully fenced
}
- a = null; // always signal
}
- if (!internal)
- unlockPhase();
- if ((a == null || a[m & (s - 1)] == null) && pool != null)
- pool.signalWork(a, p);
}
/**
* Takes next task, if one exists, in order specified by mode,
* so acts as either local-pop or local-poll. Called only by owner.
* @param internal if caller owns this queue
* @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool,
boolean internal) {
! int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
! if ((a = array) != null && (cap = a.length) > 0) { // else disabled
! if ((room = (m = cap - 1) - (s - b)) >= 0) {
+ top = s + 1;
+ long pos = slotOffset(m & s);
+ if (!internal)
+ U.putReference(a, pos, task); // inside lock
+ else
+ U.getAndSetReference(a, pos, task); // fully fenced
+ if (room == 0) // resize
+ growArray(a, cap, s);
+ }
if (!internal)
unlockPhase();
! if (room < 0)
+ throw new RejectedExecutionException("Queue capacity exceeded");
+ else if ((room == 0 || a[m & (s - 1)] == null) && pool != null)
+ pool.signalWork();
}
! }
!
! /**
! * Resizes the queue array unless out of memory.
! * @param a old array
! * @param cap old array capacity
! * @param s current top
+ */
+ private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
+ int newCap = (cap < 1 << 24) ? cap << 2 : cap << 1;
+ if (a != null && a.length == cap && cap > 0 && newCap > 0) {
ForkJoinTask<?>[] newArray = null;
! try {
newArray = new ForkJoinTask<?>[newCap];
} catch (OutOfMemoryError ex) {
}
if (newArray != null) { // else throw on next push
! int mask = cap - 1, newMask = newCap - 1;
for (int k = s, j = cap; j > 0; --j, --k) {
! ForkJoinTask<?> u; // poll old, push to new
if ((u = (ForkJoinTask<?>)U.getAndSetReference(
! a, slotOffset(k & mask), null)) == null)
break; // lost to pollers
newArray[k & newMask] = u;
}
! updateArray(newArray); // fully fenced
}
}
}
/**
* Takes next task, if one exists, in order specified by mode,
* so acts as either local-pop or local-poll. Called only by owner.
*/
private ForkJoinTask<?> nextLocalTask(int fifo) {
ForkJoinTask<?> t = null;
ForkJoinTask<?>[] a = array;
int b = base, p = top, cap;
! if (a != null && (cap = a.length) > 0) {
! for (int m = cap - 1, s, nb; p - b > 0; ) {
if (fifo == 0 || (nb = b + 1) == p) {
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset(m & (s = p - 1)), null)) != null)
updateTop(s); // else lost race for only task
break;
*/
private ForkJoinTask<?> nextLocalTask(int fifo) {
ForkJoinTask<?> t = null;
ForkJoinTask<?>[] a = array;
int b = base, p = top, cap;
! if (p - b > 0 && a != null && (cap = a.length) > 0) {
! for (int m = cap - 1, s, nb;;) {
if (fifo == 0 || (nb = b + 1) == p) {
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset(m & (s = p - 1)), null)) != null)
updateTop(s); // else lost race for only task
break;
}
while (b == (b = base)) {
U.loadFence();
Thread.onSpinWait(); // spin to reduce memory traffic
}
+ if (p - b <= 0)
+ break;
}
}
return t;
}
* Polls for a task. Used only by non-owners.
*
* @param pool if nonnull, pool to signal if more tasks exist
*/
final ForkJoinTask<?> poll(ForkJoinPool pool) {
! for (;;) {
! ForkJoinTask<?>[] a = array;
! int b = base, cap, k;
! if (a == null || (cap = a.length) <= 0)
! break;
! ForkJoinTask<?> t = a[k = b & (cap - 1)];
! U.loadFence();
! if (base == b) {
! Object o;
! int nb = b + 1, nk = nb & (cap - 1);
! if (t == null)
! o = a[k];
! else if (t == (o = U.compareAndExchangeReference(
! a, slotOffset(k), t, null))) {
updateBase(nb);
if (a[nk] != null && pool != null)
! pool.signalWork(a, nk); // propagate
return t;
}
- if (o == null && a[nk] == null && array == a &&
- (phase & (IDLE | 1)) != 0 && top - base <= 0)
- break; // empty
- }
- }
- return null;
- }
-
- /**
- * Tries to poll next task in FIFO order, failing without
- * retries on contention or stalls. Used only by topLevelExec
- * to repoll from the queue obtained from pool.scan.
- */
- private ForkJoinTask<?> tryPoll() {
- ForkJoinTask<?>[] a; int cap;
- if ((a = array) != null && (cap = a.length) > 0) {
- for (int b = base, k;;) { // loop only if inconsistent
- ForkJoinTask<?> t = a[k = b & (cap - 1)];
- U.loadFence();
- if (b == (b = base)) {
- Object o;
- if (t == null)
- o = a[k];
- else if (t == (o = U.compareAndExchangeReference(
- a, slotOffset(k), t, null))) {
- updateBase(b + 1);
- return t;
- }
- if (o == null)
- break;
- }
}
}
return null;
}
// specialized execution methods
/**
! * Runs the given (stolen) task if nonnull, as well as
- * remaining local tasks and/or others available from the
- * given queue, if any.
*/
! final void topLevelExec(ForkJoinTask<?> task, WorkQueue src, int srcId) {
! int cfg = config, fifo = cfg & FIFO, nstolen = nsteals + 1;
- if ((srcId & 1) != 0) // don't record external sources
- source = srcId;
- if ((cfg & CLEAR_TLS) != 0)
- ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
while (task != null) {
task.doExec();
! if ((task = nextLocalTask(fifo)) == null && src != null &&
- (task = src.tryPoll()) != null)
- ++nstolen;
}
! nsteals = nstolen;
! forgetSource();
}
/**
* Deep form of tryUnpush: Traverses from top and removes and
* runs task if present.
* Polls for a task. Used only by non-owners.
*
* @param pool if nonnull, pool to signal if more tasks exist
*/
final ForkJoinTask<?> poll(ForkJoinPool pool) {
! if ((phase & (IDLE | 1)) == 0 || top - base > 0) {
! for (boolean stalled = false;;) {
! int cap, b, k, nb; ForkJoinTask<?>[] a;
! if ((a = array) == null || (cap = a.length) <= 0)
! break;
! long kp = slotOffset(k = (cap - 1) & (b = base));
! int nk = (nb = b + 1) & (cap - 1); // next slot
! int sk = (b + 2) & (cap - 1); // 2nd slot ahead
! ForkJoinTask<?> t = a[k];
! U.loadFence();
! if (base != b)
! ; // inconsistent
! else if (t == null) {
! if (a[sk] == null && a[nk] == null && a[k] == null &&
+ top - b <= 0)
+ break; // empty
+ if (stalled)
+ Thread.onSpinWait();
+ stalled = true;
+ U.loadFence(); // reread
+ }
+ else if (U.compareAndSetReference(a, kp, t, null)) {
updateBase(nb);
if (a[nk] != null && pool != null)
! pool.signalWork(); // propagate
return t;
}
}
}
return null;
}
// specialized execution methods
/**
! * Runs the given task, as well as remaining local tasks.
*/
! final void topLevelExec(ForkJoinTask<?> task, int cfg) {
! int fifo = cfg & FIFO;
while (task != null) {
task.doExec();
! task = nextLocalTask(fifo);
}
! if ((cfg & CLEAR_TLS) != 0)
! ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
}
/**
* Deep form of tryUnpush: Traverses from top and removes and
* runs task if present.
U = Unsafe.getUnsafe();
Class<WorkQueue> klass = WorkQueue.class;
PHASE = U.objectFieldOffset(klass, "phase");
BASE = U.objectFieldOffset(klass, "base");
TOP = U.objectFieldOffset(klass, "top");
- SOURCE = U.objectFieldOffset(klass, "source");
ARRAY = U.objectFieldOffset(klass, "array");
}
}
// static fields (initialized in static initializer below)
if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
w.source = DEREGISTERED;
if (phase != 0) { // else failed to start
replaceable = true;
if ((phase & IDLE) != 0)
! reactivate(w); // pool stopped before released
}
}
}
! long c = ctl;
! if (src != DEREGISTERED) // decrement counts
do {} while (c != (c = compareAndExchangeCtl(
c, ((RC_MASK & (c - RC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(LMASK & c)))));
! else if ((int)c != 0)
! replaceable = true; // signal below to cascade timeouts
! if (w != null) { // cancel remaining tasks
! for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
! try {
! t.cancel(false);
- } catch (Throwable ignore) {
}
}
}
if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
WorkQueue[] qs; int n, i; // remove index unless terminating
long ns = w.nsteals & 0xffffffffL;
! int stop = lockRunState() & STOP;
! if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0 &&
! qs[i = phase & SMASK & (n - 1)] == w) {
qs[i] = null;
stealCount += ns; // accumulate steals
}
unlockRunState();
}
- if ((runState & STOP) == 0 && replaceable)
- signalWork(null, 0); // may replace unless trimmed or uninitialized
if (ex != null)
ForkJoinTask.rethrow(ex);
}
/**
! * Releases an idle worker, or creates one if not enough exist,
- * returning on contention if a signal task is already taken.
- *
- * @param a if nonnull, a task array holding task signalled
- * @param k index of task in array
*/
! final void signalWork(ForkJoinTask<?>[] a, int k) {
int pc = parallelism;
for (long c = ctl;;) {
WorkQueue[] qs = queues;
long ac = (c + RC_UNIT) & RC_MASK, nc;
int sp = (int)c, i = sp & SMASK;
! if (qs == null || qs.length <= i)
break;
WorkQueue w = qs[i], v = null;
if (sp == 0) {
if ((short)(c >>> TC_SHIFT) >= pc)
break;
nc = ((c + TC_UNIT) & TC_MASK);
}
! else if ((short)(c >>> RC_SHIFT) >= pc || (v = w) == null)
break;
! else
nc = (v.stackPred & LMASK) | (c & TC_MASK);
if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
if (v == null)
createWorker();
else {
v.phase = sp;
if (v.parking != 0)
! U.unpark(v.owner);
}
break;
}
- if (a != null && k >= 0 && k < a.length && a[k] == null)
- break;
}
}
/**
* Reactivates the given worker, and possibly others if not top of
* ctl stack. Called only during shutdown to ensure release on
* termination.
*/
! private void reactivate(WorkQueue w) {
for (long c = ctl;;) {
WorkQueue[] qs; WorkQueue v; int sp, i;
! if ((qs = queues) == null || (sp = (int)c) == 0 ||
! qs.length <= (i = sp & SMASK) || (v = qs[i]) == null ||
- (v != w && w != null && (w.phase & IDLE) == 0))
break;
if (c == (c = compareAndExchangeCtl(
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
(v.stackPred & LMASK))))) {
v.phase = sp;
! if (v == w)
- break;
- if (v.parking != 0)
- U.unpark(v.owner);
}
}
}
/**
* Internal version of isQuiescent and related functionality.
! * @return true if terminating or all workers are inactive and
! * submission queues are empty and unlocked; if so, setting STOP
! * if shutdown is enabled
*/
! private boolean quiescent() {
outer: for (;;) {
long phaseSum = 0L;
boolean swept = false;
for (int e, prevRunState = 0; ; prevRunState = e) {
long c = ctl;
if (((e = runState) & STOP) != 0)
! return true; // terminating
else if ((c & RC_MASK) > 0L)
! return false; // at least one active
else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
long sum = c;
WorkQueue[] qs = queues; WorkQueue q;
int n = (qs == null) ? 0 : qs.length;
for (int i = 0; i < n; ++i) { // scan queues
if ((q = qs[i]) != null) {
int p = q.phase, s = q.top, b = q.base;
sum += (p & 0xffffffffL) | ((long)b << 32);
if ((p & IDLE) == 0 || s - b > 0) {
if ((i & 1) == 0 && compareAndSetCtl(c, c))
! signalWork(null, 0); // ensure live
! return false;
}
}
}
swept = (phaseSum == (phaseSum = sum));
}
else if ((e & SHUTDOWN) == 0)
! return true;
else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
! interruptAll(); // confirmed
! return true; // enable termination
}
else
break; // restart
}
}
if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
w.source = DEREGISTERED;
if (phase != 0) { // else failed to start
replaceable = true;
if ((phase & IDLE) != 0)
! releaseAll(); // pool stopped before released
}
}
}
! if (src != DEREGISTERED) { // decrement counts
! long c = ctl;
do {} while (c != (c = compareAndExchangeCtl(
c, ((RC_MASK & (c - RC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(LMASK & c)))));
! if (w != null && w.top - w.base > 0) {
! for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
! try { // cancel remaining tasks
! t.cancel(false);
! } catch (Throwable ignore) {
! }
}
}
}
if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
WorkQueue[] qs; int n, i; // remove index unless terminating
long ns = w.nsteals & 0xffffffffL;
! if ((lockRunState() & STOP) != 0)
! replaceable = false;
! else if ((qs = queues) != null && (n = qs.length) > 0 &&
+ qs[i = phase & SMASK & (n - 1)] == w) {
qs[i] = null;
stealCount += ns; // accumulate steals
}
unlockRunState();
+ if (replaceable)
+ signalWork();
}
if (ex != null)
ForkJoinTask.rethrow(ex);
}
/**
! * Releases an idle worker, or creates one if not enough exist.
*/
! final void signalWork() {
int pc = parallelism;
for (long c = ctl;;) {
WorkQueue[] qs = queues;
long ac = (c + RC_UNIT) & RC_MASK, nc;
int sp = (int)c, i = sp & SMASK;
! if ((short)(c >>> RC_SHIFT) >= pc)
+ break;
+ if (qs == null)
+ break;
+ if (qs.length <= i)
break;
WorkQueue w = qs[i], v = null;
+ Thread t = null;
if (sp == 0) {
if ((short)(c >>> TC_SHIFT) >= pc)
break;
nc = ((c + TC_UNIT) & TC_MASK);
}
! else if ((v = w) == null)
break;
! else {
+ t = v.owner;
nc = (v.stackPred & LMASK) | (c & TC_MASK);
+ }
if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
if (v == null)
createWorker();
else {
v.phase = sp;
if (v.parking != 0)
! U.unpark(t);
}
break;
}
}
}
/**
* Reactivates the given worker, and possibly others if not top of
* ctl stack. Called only during shutdown to ensure release on
* termination.
*/
! private void releaseAll() {
for (long c = ctl;;) {
WorkQueue[] qs; WorkQueue v; int sp, i;
! if ((sp = (int)c) == 0 || (qs = queues) == null ||
! qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
break;
if (c == (c = compareAndExchangeCtl(
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
(v.stackPred & LMASK))))) {
v.phase = sp;
! U.unpark(v.owner);
}
}
}
/**
* Internal version of isQuiescent and related functionality.
! * @return positive if stopping, nonnegative if terminating or all
! * workers are inactive and submission queues are empty and
! * unlocked; if so, setting STOP if shutdown is enabled
*/
! private int quiescent() {
outer: for (;;) {
long phaseSum = 0L;
boolean swept = false;
for (int e, prevRunState = 0; ; prevRunState = e) {
long c = ctl;
if (((e = runState) & STOP) != 0)
! return 1; // terminating
else if ((c & RC_MASK) > 0L)
! return -1; // at least one active
else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
long sum = c;
WorkQueue[] qs = queues; WorkQueue q;
int n = (qs == null) ? 0 : qs.length;
for (int i = 0; i < n; ++i) { // scan queues
if ((q = qs[i]) != null) {
int p = q.phase, s = q.top, b = q.base;
sum += (p & 0xffffffffL) | ((long)b << 32);
if ((p & IDLE) == 0 || s - b > 0) {
if ((i & 1) == 0 && compareAndSetCtl(c, c))
! signalWork();
! return -1;
}
}
}
swept = (phaseSum == (phaseSum = sum));
}
else if ((e & SHUTDOWN) == 0)
! return 0;
else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
! releaseAll(); // confirmed
! return 1; // enable termination
}
else
break; // restart
}
}
* See above for explanation.
*
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
! if (w != null) {
! int phase = w.phase, r = w.stackPred; // seed from registerWorker
- long window = (long)((r >>> 16) & SMASK) | NO_HISTORY;
do {
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
! } while ((runState & STOP) == 0 &&
! (((window = scan(w, window, r)) < 0L ||
! ((phase = awaitWork(w, phase)) & IDLE) == 0)));
}
}
/**
* Scans for and if found executes top-level tasks: Tries to poll
! * each queue starting at initial index with random stride,
! * returning next scan window and retry indicator.
*
* @param w caller's WorkQueue
- * @param window up to three queue indices
* @param r random seed
! * @return the next window to use, with RESCAN set for rescan
*/
! private long scan(WorkQueue w, long window, int r) {
! WorkQueue[] qs = queues;
! int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1;
! long next = window & ~RESCAN;
! outer: for (int i = (short)window, l = n; l > 0; --l, i += step) {
! int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
! if ((q = qs[j = i & SMASK & (n - 1)]) != null &&
! (a = q.array) != null && (cap = a.length) > 0) {
! for (int b = q.base;;) {
! int nb = b + 1, nk = nb & (cap - 1), k;
! ForkJoinTask<?> t = a[k = b & (cap - 1)];
- U.loadFence(); // re-read b and t
- if (b == (b = q.base)) { // else inconsistent; retry
- Object o;
- if (t == null)
- o = a[k];
- else if (t == (o = U.compareAndExchangeReference(
- a, slotOffset(k), t, null))) {
- q.updateBase(nb);
- next = RESCAN | ((window << 16) & HMASK) | j;
- if (window != next && a[nk] != null)
- signalWork(a, nk); // limit propagation
- if (w != null) // always true
- w.topLevelExec(t, q, j);
- break outer;
- }
- if (o == null) {
- if (next >= 0L && a[nk] != null)
- next |= RESCAN;
break;
}
}
}
}
}
! return next;
}
/**
! * Tries to inactivate, and if successful, awaits signal or termination.
*
* @param w the worker (may be null if already terminated)
* @param phase current phase
! * @return current phase, with IDLE set if worker should exit
! */
! private int awaitWork(WorkQueue w, int phase) {
! boolean quiet; // true if possibly quiescent
! int active = phase + (IDLE << 1), p = phase | IDLE, e;
! if (w != null) {
! w.phase = p; // deactivate
! long np = active & LMASK, pc = ctl; // try to enqueue
- long qc = np | ((pc - RC_UNIT) & UMASK);
w.stackPred = (int)pc; // set ctl stack link
! if (pc != (pc = compareAndExchangeCtl(pc, qc))) {
! qc = np | ((pc - RC_UNIT) & UMASK);
! w.stackPred = (int)pc; // retry once
! if (pc != (pc = compareAndExchangeCtl(pc, qc)))
- p = w.phase = phase; // back out
- }
- if (p != phase && ((e = runState) & STOP) == 0 &&
- (!(quiet = (qc & RC_MASK) <= 0L) || (e & SHUTDOWN) == 0 ||
- !(quiet = quiescent()) || (runState & STOP) == 0)) {
- long deadline = 0L; // not terminating
- if (quiet) { // use timeout if trimmable
- int nt = (short)(qc >>> TC_SHIFT);
- long delay = keepAlive; // scale if not at target
- if (nt != (nt = Math.max(nt, parallelism)) && nt > 0)
- delay = Math.max(TIMEOUT_SLOP, delay / nt);
- if ((deadline = delay + System.currentTimeMillis()) == 0L)
- deadline = 1L; // avoid zero
- }
- boolean release = quiet;
WorkQueue[] qs = queues; // recheck queues
int n = (qs == null) ? 0 : qs.length;
for (int l = -n, j = active; l < n; ++l, ++j) {
! WorkQueue q; ForkJoinTask<?>[] a; int cap;
! if ((p = w.phase) == active) // interleave signal checks
! break;
! if ((q = qs[j & (n - 1)]) != null &&
! (a = q.array) != null && (cap = a.length) > 0 &&
! a[q.base & (cap - 1)] != null) {
! if (release && qc == ctl && compareAndSetCtl(qc, pc)) {
p = w.phase = active;
break; // possible missed signal
}
- release = true; // track multiple or reencounter
}
Thread.onSpinWait(); // reduce memory traffic
}
- if (p != active) { // emulate LockSupport.park
- LockSupport.setCurrentBlocker(this);
- w.parking = 1;
- for (;;) {
- if ((runState & STOP) != 0 || (p = w.phase) == active)
- break;
- U.park(deadline != 0L, deadline);
- if ((p = w.phase) == active || (runState & STOP) != 0)
- break;
- Thread.interrupted(); // clear for next park
- if (deadline != 0L && TIMEOUT_SLOP >
- deadline - System.currentTimeMillis()) {
- long sp = w.stackPred & LMASK, c = ctl;
- long nc = sp | (UMASK & (c - TC_UNIT));
- if (((int)c & SMASK) == (active & SMASK) &&
- compareAndSetCtl(c, nc)) {
- w.source = DEREGISTERED;
- w.phase = active;
- break; // trimmed on timeout
- }
- deadline = 0L; // no longer trimmable
- }
- }
- w.parking = 0;
- LockSupport.setCurrentBlocker(null);
- }
}
}
return p;
}
/**
* Scans for and returns a polled task, if available. Used only
* for untracked polls. Begins scan at a random index to avoid
* systematic unfairness.
*
* See above for explanation.
*
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
! if (w != null) { // use seed from registerWorker
! int phase = w.phase, r = w.stackPred, cfg = w.config, stop;
do {
+ stop = runState & STOP;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
! } while (stop == 0 &&
! (scan(w, r, cfg) ||
! ((phase = deactivate(w, phase)) & IDLE) == 0 ||
+ ((phase = awaitWork(w, phase)) & IDLE) == 0));
}
}
/**
* Scans for and if found executes top-level tasks: Tries to poll
! * each queue starting at random index with random stride,
! * returning retry indicator.
*
* @param w caller's WorkQueue
* @param r random seed
! * @param cfg config bits
+ * @return true for rescan
*/
! private boolean scan(WorkQueue w, int r, int cfg) {
! WorkQueue[] qs; int n;
! boolean rescan = false;
! if ((qs = queues) != null && (n = qs.length) > 0 && w != null) {
! int i = r, step = (r >>> 16) | 1, nstolen = 0;
! sweep: for (int l = n; l > 0; --l, i += step) {
! int j; WorkQueue q;
! if ((q = qs[j = i & (n - 1)]) != null) {
! for (int retries = 0;;) {
! int cap, b, k, nb; ForkJoinTask<?>[] a;
! if ((a = q.array) == null || (cap = a.length) <= 0)
break;
+ long kp = slotOffset(k = (cap - 1) & (b = q.base));
+ int nk = (nb = b + 1) & (cap - 1); // next slot
+ int sk = (b + 2) & (cap - 1); // 2nd slot ahead
+ ForkJoinTask<?> t = a[k];
+ U.loadFence();
+ if (q.base != b)
+ ; // inconsistent
+ else if (t == null) {
+ if (a[sk] == null && a[nk] == null && a[k] == null)
+ break; // probably empty
+ U.loadFence(); // reread
+ }
+ else if (U.compareAndSetReference(a, kp, t, null)) {
+ q.base = nb;
+ w.source = j; // volatile write
+ rescan = true;
+ if (nstolen++ == 0 && a[nk] != null)
+ signalWork(); // propagate signal
+ w.topLevelExec(t, cfg);
+ if (q.base == nb)
+ continue; // no other pollers
+ }
+ if (++retries >= MAX_SCAN_RETRIES) {
+ rescan = true;
+ break sweep; // randomly move
}
}
}
}
+ if (nstolen != 0)
+ w.nsteals += nstolen;
}
! return rescan;
}
/**
! * Deactivates after failing to find work; reactivates on ctl
+ * contention, signal, or possible missed signal
*
* @param w the worker (may be null if already terminated)
* @param phase current phase
! * @return current phase, or IDLE for exit
! */
! private int deactivate(WorkQueue w, int phase) {
! int p = IDLE;
! if ((runState & STOP) == 0 && w != null) {
! int active = phase + (IDLE << 1);
! w.phase = phase | IDLE;
! long pc = ctl, qc = (active & LMASK) | ((pc - RC_UNIT) & UMASK);
w.stackPred = (int)pc; // set ctl stack link
! if (!compareAndSetCtl(pc, qc)) // try to enqueue
! p = w.phase = phase; // back out on contention
! else if ((qc & RC_MASK) > 0L || quiescent() <= 0) {
! boolean release = false;
WorkQueue[] qs = queues; // recheck queues
int n = (qs == null) ? 0 : qs.length;
for (int l = -n, j = active; l < n; ++l, ++j) {
! WorkQueue q = qs[j & (n - 1)];
! if (((p = w.phase) & IDLE) == 0)
! break; // interleave signal checks
! if (q != null && q.top - q.base > 0) {
! if (!release) // need multiple or reencounter
! release = true;
! else if (ctl == qc && compareAndSetCtl(qc, pc)) {
p = w.phase = active;
break; // possible missed signal
}
}
Thread.onSpinWait(); // reduce memory traffic
}
}
}
return p;
}
+ /**
+ * Awaits signal or termination.
+ *
+ * @param w the worker (may be null if already terminated)
+ * @param phase current phase or IDLE if already exiting
+ * @return current phase, with IDLE set if worker should exit
+ */
+ private int awaitWork(WorkQueue w, int phase) {
+ if (phase != IDLE && w != null) {
+ long deadline = 0L, c; // Use timeout if quiescent
+ long d = (w.source == INVALID_ID) ? TIMEOUT_SLOP : keepAlive;
+ if (((c = ctl) & RC_MASK) <= 0L && (int)c == phase + IDLE &&
+ (deadline = d + System.currentTimeMillis()) == 0L)
+ deadline = 1L; // avoid zero
+ LockSupport.setCurrentBlocker(this);
+ w.parking = 1; // enable unpark
+ for (;;) {
+ if ((runState & STOP) != 0)
+ break;
+ if (((phase = w.phase) & IDLE) == 0)
+ break;
+ U.park(deadline != 0L, deadline);
+ if (((phase = w.phase) & IDLE) == 0)
+ break;
+ if ((runState & STOP) != 0)
+ break;
+ Thread.interrupted(); // clear for next park
+ if (deadline != 0L && TIMEOUT_SLOP >
+ deadline - System.currentTimeMillis()) {
+ if (tryTrim(w))
+ break;
+ deadline = 0L; // no longer trimmable
+ }
+ }
+ w.parking = 0;
+ LockSupport.setCurrentBlocker(null);
+ }
+ return phase;
+ }
+
+ /**
+ * Tries to remove and deregister worker after timeout, and release
+ * others to do the same. Fails if was instead signalled first.
+ */
+ private boolean tryTrim(WorkQueue w) {
+ int phase;
+ if (w != null && ((phase = w.phase) & IDLE) != 0) {
+ long sp = w.stackPred & LMASK, c = ctl;
+ long nc = sp | (UMASK & (c - TC_UNIT));
+ int active = phase + IDLE;
+ if ((int)c == active && compareAndSetCtl(c, nc)) {
+ WorkQueue[] vs; WorkQueue v; int vp, i;
+ w.source = DEREGISTERED;
+ w.phase = active; // try to wake up next waiter
+ if ((vp = (int)nc) != 0 && (vs = queues) != null &&
+ vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
+ compareAndSetCtl(
+ nc, ((UMASK & (nc + RC_UNIT)) |
+ (nc & TC_MASK) | (v.stackPred & LMASK)))) {
+ v.source = INVALID_ID; // enable cascaded timeouts
+ v.phase = vp;
+ U.unpark(v.owner);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Scans for and returns a polled task, if available. Used only
* for untracked polls. Begins scan at a random index to avoid
* systematic unfairness.
*
WorkQueue[] qs; WorkQueue v; int i;
if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
(v = qs[i]) != null &&
compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
v.phase = sp;
! if (v.parking != 0)
- U.unpark(v.owner);
stat = UNCOMPENSATE;
}
}
else if (active > minActive && total >= pc) { // reduce active workers
if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
WorkQueue[] qs; WorkQueue v; int i;
if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
(v = qs[i]) != null &&
compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
v.phase = sp;
! U.unpark(v.owner);
stat = UNCOMPENSATE;
}
}
else if (active > minActive && total >= pc) { // reduce active workers
if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
* @param task the task
* @param w caller's WorkQueue
* @param internal true if w is owned by a ForkJoinWorkerThread
* @return task status on exit, or UNCOMPENSATE for compensated blocking
*/
-
final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
if (w != null)
w.tryRemoveAndExec(task, internal);
int s = 0;
if (task != null && (s = task.status) >= 0 && internal && w != null) {
* @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
* @param interruptible true if return on interrupt
* @return positive if quiescent, negative if interrupted, else 0
*/
private int externalHelpQuiesce(long nanos, boolean interruptible) {
! if (!quiescent()) {
long startTime = System.nanoTime();
long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
for (int waits = 0;;) {
ForkJoinTask<?> t;
if (interruptible && Thread.interrupted())
return -1;
else if ((t = pollScan(false)) != null) {
waits = 0;
t.doExec();
}
! else if (quiescent())
break;
else if (System.nanoTime() - startTime > nanos)
return 0;
else if (waits == 0)
waits = MIN_SLEEP;
* @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
* @param interruptible true if return on interrupt
* @return positive if quiescent, negative if interrupted, else 0
*/
private int externalHelpQuiesce(long nanos, boolean interruptible) {
! if (quiescent() < 0) {
long startTime = System.nanoTime();
long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
for (int waits = 0;;) {
ForkJoinTask<?> t;
if (interruptible && Thread.interrupted())
return -1;
else if ((t = pollScan(false)) != null) {
waits = 0;
t.doExec();
}
! else if (quiescent() >= 0)
break;
else if (System.nanoTime() - startTime > nanos)
return 0;
else if (waits == 0)
waits = MIN_SLEEP;
* if no work and no active workers
* @param enable if true, terminate when next possible
* @return runState on exit
*/
private int tryTerminate(boolean now, boolean enable) {
! int e = runState;
if ((e & STOP) == 0) {
if (now) {
int s = lockRunState();
runState = e = (s + RS_LOCK) | STOP | SHUTDOWN;
if ((s & STOP) == 0)
interruptAll();
}
! else {
! int isShutdown = (e & SHUTDOWN);
! if (isShutdown == 0 && enable)
! getAndBitwiseOrRunState(isShutdown = SHUTDOWN);
! if (isShutdown != 0)
- quiescent(); // may trigger STOP
- e = runState;
}
}
if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
! int r = (int)Thread.currentThread().threadId(); // stagger traversals
! WorkQueue[] qs = queues;
! int n = (qs == null) ? 0 : qs.length;
! for (int l = n; l > 0; --l, ++r) {
! int j = r & SMASK & (n - 1); WorkQueue q; ForkJoinTask<?> t;
! while ((q = qs[j]) != null && q.source != DEREGISTERED &&
! (t = q.poll(null)) != null) {
! try {
! t.cancel(false);
! } catch (Throwable ignore) {
}
}
}
if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
CountDownLatch done; SharedThreadContainer ctr;
if ((done = termination) != null)
done.countDown();
if ((ctr = container) != null)
ctr.close();
}
- e = runState;
}
}
return e;
}
* if no work and no active workers
* @param enable if true, terminate when next possible
* @return runState on exit
*/
private int tryTerminate(boolean now, boolean enable) {
! int e = runState, isShutdown;
if ((e & STOP) == 0) {
if (now) {
int s = lockRunState();
runState = e = (s + RS_LOCK) | STOP | SHUTDOWN;
if ((s & STOP) == 0)
interruptAll();
}
! else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) {
! if (isShutdown == 0)
! getAndBitwiseOrRunState(SHUTDOWN);
! if (quiescent() > 0)
! e = runState;
}
}
if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
! if ((ctl & RC_MASK) > 0L) { // unless was quiescent
! int r = (int)Thread.currentThread().threadId();
! WorkQueue[] qs = queues; // stagger traversals
! int n = (qs == null) ? 0 : qs.length;
! for (int l = n; l > 0; --l, ++r) {
! WorkQueue q; ForkJoinTask<?> t;
! if ((q = qs[r & (n - 1)]) != null &&
! q.source != DEREGISTERED) {
! while ((t = q.poll(null)) != null) {
! try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
+ }
+ }
}
}
}
if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
+ e |= TERMINATED;
if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
CountDownLatch done; SharedThreadContainer ctr;
if ((done = termination) != null)
done.countDown();
if ((ctr = container) != null)
ctr.close();
}
}
}
return e;
}
* threads remain inactive.
*
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
! return quiescent();
}
/**
* Returns an estimate of the total number of completed tasks that
* were executed by a thread other than their submitter. The
* threads remain inactive.
*
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
! return quiescent() >= 0;
}
/**
* Returns an estimate of the total number of completed tasks that
* were executed by a thread other than their submitter. The
/**
* Invokes tryCompensate to create or re-activate a spare thread to
* compensate for a thread that performs a blocking operation. When the
* blocking operation is done then endCompensatedBlock must be invoked
* with the value returned by this method to re-adjust the parallelism.
+ * @return value to use in endCompensatedBlock
*/
final long beginCompensatedBlock() {
int c;
do {} while ((c = tryCompensate(ctl)) < 0);
return (c == 0) ? 0L : RC_UNIT;
}
/**
* Re-adjusts parallelism after a blocking operation completes.
+ * @param post value from beginCompensatedBlock
*/
void endCompensatedBlock(long post) {
if (post > 0L) {
getAndAddCtl(post);
}
< prev index next >