< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
*** 292,14 ***
       * illustrates some of the implementation decisions in this class.
       *
       *  * Slot k must be read with an acquiring read, which it must
       *    anyway to dereference and run the task if the (acquiring)
       *    CAS succeeds, but uses an explicit acquire fence to support
!      *    the following rechecks even if the CAS is not attempted.  To
-      *    more easily distinguish among kinds of CAS failures, we use
-      *    the compareAndExchange version, and usually handle null
-      *    returns (indicating contention) separately from others.
       *
       *  * q.base may change between reading and using its value to
       *    index the slot. To avoid trying to use the wrong t, the
       *    index and slot must be reread (not necessarily immediately)
       *    until consistent, unless this is a local poll by owner, in
--- 292,11 ---
       * illustrates some of the implementation decisions in this class.
       *
       *  * Slot k must be read with an acquiring read, which it must
       *    anyway to dereference and run the task if the (acquiring)
       *    CAS succeeds, but uses an explicit acquire fence to support
!      *    the following rechecks even if the CAS is not attempted.
       *
       *  * q.base may change between reading and using its value to
       *    index the slot. To avoid trying to use the wrong t, the
       *    index and slot must be reread (not necessarily immediately)
       *    until consistent, unless this is a local poll by owner, in

*** 561,32 ***
       *    opportunities to over-signal.  Note that in either of these
       *    contexts, signals may be (and often are) unnecessary because
       *    active workers continue scanning after running tasks without
       *    the need to be signalled (which is one reason work stealing
       *    is often faster than alternatives), so additional workers
!      *    aren't needed. We filter out some of these cases by exiting
-      *    retry loops in signalWork if the task responsible for the
-      *    signal has already been taken.
       *
       * * For rapidly branching tasks that require full pool resources,
       *   oversignalling is OK, because signalWork will soon have no
       *   more workers to create or reactivate. But for others (mainly
       *   externally submitted tasks), overprovisioning may cause very
       *   noticeable slowdowns due to contention and resource
!      *   wastage. All remedies are intrinsically heuristic. We use a
!      *   strategy that works well in most cases: We track "sources"
!      *   (queue ids) of non-empty (usually polled) queues while
-      *   scanning. These are maintained in the "source" field of
-      *   WorkQueues for use in method helpJoin and elsewhere (see
-      *   below). We also maintain them as arguments/results of
-      *   top-level polls (argument "window" in method scan, with setup
-      *   in method runWorker) as an encoded sliding window of current
-      *   and previous two sources (or INVALID_ID if none), and stop
-      *   signalling when all were from the same source. These
-      *   mechanisms may result in transiently too few workers, but
-      *   once workers poll from a new source, they rapidly reactivate
-      *   others.
       *
       * * Despite these, signal contention and overhead effects still
       *   occur during ramp-up and ramp-down of small computations.
       *
       * Scanning. Method scan performs top-level scanning for (and
--- 558,20 ---
       *    opportunities to over-signal.  Note that in either of these
       *    contexts, signals may be (and often are) unnecessary because
       *    active workers continue scanning after running tasks without
       *    the need to be signalled (which is one reason work stealing
       *    is often faster than alternatives), so additional workers
!      *    aren't needed.
       *
       * * For rapidly branching tasks that require full pool resources,
       *   oversignalling is OK, because signalWork will soon have no
       *   more workers to create or reactivate. But for others (mainly
       *   externally submitted tasks), overprovisioning may cause very
       *   noticeable slowdowns due to contention and resource
!      *   wastage. We reduce impact by deactivating workers when
!      *   queues don't have accessible tasks, but reactivating and
!      *   rescanning if other tasks remain.
       *
       * * Despite these, signal contention and overhead effects still
       *   occur during ramp-up and ramp-down of small computations.
       *
       * Scanning. Method scan performs top-level scanning for (and

*** 596,29 ***
       * method as WorkQueue.poll(), but restarts with a different
       * permutation on each invocation.  The pseudorandom generator
       * need not have high-quality statistical properties in the long
       * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
       * from ThreadLocalRandom probes, which are cheap and
!      * suffice. Scans do not otherwise explicitly take into account
!      * core affinities, loads, cache localities, etc, However, they do
!      * exploit temporal locality (which usually approximates these) by
!      * preferring to re-poll from the same queue (either in method
!      * tryPoll() or scan) after a successful poll before trying others
!      * (see method topLevelExec), which also reduces bookkeeping,
!      * cache traffic, and scanning overhead. But it also reduces
!      * fairness, which is partially counteracted by giving up on
!      * contention.
       *
       * Deactivation. When method scan indicates that no tasks are
!      * found by a worker, it tries to deactivate (in awaitWork),
!      * giving up (and rescanning) on ctl contention. To avoid missed
!      * signals during deactivation, the method rescans and reactivates
!      * if there may have been a missed signal during deactivation,
!      * filtering out most cases in which this is unnecessary. Because
!      * idle workers are often not yet blocked (parked), we use the
!      * WorkQueue parking field to advertise that a waiter actually
!      * needs unparking upon signal.
       *
       * Quiescence. Workers scan looking for work, giving up when they
       * don't find any, without being sure that none are available.
       * However, some required functionality relies on consensus about
       * quiescence (also termination, discussed below). The count
--- 581,30 ---
       * method as WorkQueue.poll(), but restarts with a different
       * permutation on each invocation.  The pseudorandom generator
       * need not have high-quality statistical properties in the long
       * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
       * from ThreadLocalRandom probes, which are cheap and
!      * suffice. Each queue's polling attempt uses a bounded retry
!      * (MAX_SCAN_RETRIES) to avoid becoming stuck when other
!      * scanners/pollers stall.  Scans do not otherwise explicitly take
!      * into account core affinities, loads, cache localities, etc,
!      * However, they do exploit temporal locality (which usually
!      * approximates these) by preferring to re-poll from the same
!      * queue after a successful poll before trying others, which also
!      * reduces bookkeeping, cache traffic, and scanning overhead. But
!      * it also reduces fairness, which is partially counteracted by
+      * giving up on contention.
       *
       * Deactivation. When method scan indicates that no tasks are
!      * found by a worker, it tries to deactivate(), giving up (and
!      * rescanning) on ctl contention. To avoid missed signals during
!      * deactivation, the method rescans and reactivates if there may
!      * have been a missed signal during deactivation, filtering out
!      * most cases in which this is unnecessary. Because idle workers
!      * are often not yet blocked (parked), we use the WorkQueue
!      * parking field to advertise that a waiter actually needs
!      * unparking upon signal.
       *
       * Quiescence. Workers scan looking for work, giving up when they
       * don't find any, without being sure that none are available.
       * However, some required functionality relies on consensus about
       * quiescence (also termination, discussed below). The count

*** 666,13 ***
       *
       * Trimming workers. To release resources after periods of lack of
       * use, a worker starting to wait when the pool is quiescent will
       * time out and terminate if the pool has remained quiescent for
       * period given by field keepAlive (default 60sec), which applies
!      * to the first timeout of a fully populated pool. Subsequent (or
!      * other) cases use delays such that, if still quiescent, all will
!      * be released before one additional keepAlive unit elapses.
       *
       * Joining Tasks
       * =============
       *
       * The "Join" part of ForkJoinPools consists of a set of
--- 652,13 ---
       *
       * Trimming workers. To release resources after periods of lack of
       * use, a worker starting to wait when the pool is quiescent will
       * time out and terminate if the pool has remained quiescent for
       * period given by field keepAlive (default 60sec), which applies
!      * to the first timeout of a quiescent pool. Subsequent cases use
!      * minimal delays such that, if still quiescent, all will be
!      * released soon therafter.
       *
       * Joining Tasks
       * =============
       *
       * The "Join" part of ForkJoinPools consists of a set of

*** 895,14 ***
       * arrays are better for locality and reduce GC scan time, but
       * large arrays reduce both direct false-sharing and indirect
       * cases due to GC bookkeeping (cardmarks etc), and reduce the
       * number of resizes, which are not especially fast because they
       * require atomic transfers.  Currently, arrays are initialized to
!      * be fairly small.  (Maintenance note: any changes in fields,
!      * queues, or their uses, or JVM layout policies, must be
!      * accompanied by re-evaluation of these placement and sizing
!      * decisions.)
       *
       * Style notes
       * ===========
       *
       * Memory ordering relies mainly on atomic operations (CAS,
--- 881,15 ---
       * arrays are better for locality and reduce GC scan time, but
       * large arrays reduce both direct false-sharing and indirect
       * cases due to GC bookkeeping (cardmarks etc), and reduce the
       * number of resizes, which are not especially fast because they
       * require atomic transfers.  Currently, arrays are initialized to
!      * be fairly small but early resizes rapidly increase size by more
!      * than a factor of two until very large. (Maintenance note: any
!      * changes in fields, queues, or their uses, or JVM layout
!      * policies, must be accompanied by re-evaluation of these
+      * placement and sizing decisions.)
       *
       * Style notes
       * ===========
       *
       * Memory ordering relies mainly on atomic operations (CAS,

*** 952,28 ***
       * (5) Internal control methods
       * (6) Callbacks and other support for ForkJoinTask methods
       * (7) Exported methods
       * (8) Static block initializing statics in minimally dependent order
       *
-      * Revision notes
-      * ==============
-      *
-      * The main sources of differences from previous version are:
-      *
-      * * New abstract class ForkJoinTask.InterruptibleTask ensures
-      *   handling of tasks submitted under the ExecutorService
-      *   API are consistent with specs.
-      * * Method quiescent() replaces previous quiescence-related
-      *   checks, relying on versioning and sequence locking instead
-      *   of ReentrantLock.
-      * * Termination processing now ensures that internal data
-      *   structures are maintained consistently enough while stopping
-      *   to interrupt all workers and cancel all tasks. It also uses a
-      *   CountDownLatch instead of a Condition for termination because
-      *   of lock change.
-      * * Many other changes to avoid performance regressions due
-      *   to the above.
       */
  
      // static configuration constants
  
      /**
--- 939,10 ---

*** 981,11 ***
       * to park waiting for new work before terminating.
       */
      static final long DEFAULT_KEEPALIVE = 60_000L;
  
      /**
!      * Undershoot tolerance for idle timeouts
       */
      static final long TIMEOUT_SLOP = 20L;
  
      /**
       * The default value for common pool maxSpares.  Overridable using
--- 950,12 ---
       * to park waiting for new work before terminating.
       */
      static final long DEFAULT_KEEPALIVE = 60_000L;
  
      /**
!      * Undershoot tolerance for idle timeouts, also serving as the
+      * minimum allowed timeout value.
       */
      static final long TIMEOUT_SLOP = 20L;
  
      /**
       * The default value for common pool maxSpares.  Overridable using

*** 1027,21 ***
      // {pool, workQueue} config bits
      static final int FIFO             = 1 << 0;   // fifo queue or access mode
      static final int CLEAR_TLS        = 1 << 1;   // set for Innocuous workers
      static final int PRESET_SIZE      = 1 << 2;   // size was set by property
  
-     // source history window packing used in scan() and runWorker()
-     static final long RESCAN          = 1L << 63; // must be negative
-     static final long HMASK           = ((((long)SMASK) << 32) |
-                                          (((long)SMASK) << 16)); // history bits
-     static final long NO_HISTORY      = ((((long)INVALID_ID) << 32) | // no 3rd
-                                          (((long)INVALID_ID) << 16)); // no 2nd
- 
      // others
      static final int DEREGISTERED     = 1 << 31;  // worker terminating
      static final int UNCOMPENSATE     = 1 << 16;  // tryCompensate return
      static final int IDLE             = 1 << 16;  // phase seqlock/version count
  
      /*
       * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
       * RC: Number of released (unqueued) workers
       * TC: Number of total workers
--- 997,15 ---
      // {pool, workQueue} config bits
      static final int FIFO             = 1 << 0;   // fifo queue or access mode
      static final int CLEAR_TLS        = 1 << 1;   // set for Innocuous workers
      static final int PRESET_SIZE      = 1 << 2;   // size was set by property
  
      // others
      static final int DEREGISTERED     = 1 << 31;  // worker terminating
      static final int UNCOMPENSATE     = 1 << 16;  // tryCompensate return
      static final int IDLE             = 1 << 16;  // phase seqlock/version count
+     static final int MAX_SCAN_RETRIES = 8;        // scan retry limit
  
      /*
       * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
       * RC: Number of released (unqueued) workers
       * TC: Number of total workers

*** 1226,22 ***
          // Support for atomic operations
          private static final Unsafe U;
          private static final long PHASE;
          private static final long BASE;
          private static final long TOP;
-         private static final long SOURCE;
          private static final long ARRAY;
  
          final void updateBase(int v) {
              U.putIntVolatile(this, BASE, v);
          }
          final void updateTop(int v) {
              U.putIntOpaque(this, TOP, v);
          }
-         final void forgetSource() {
-             U.putIntOpaque(this, SOURCE, 0);
-         }
          final void updateArray(ForkJoinTask<?>[] a) {
              U.getAndSetReference(this, ARRAY, a);
          }
          final void unlockPhase() {
              U.getAndAddInt(this, PHASE, IDLE);
--- 1190,18 ---

*** 1289,46 ***
           * @param internal if caller owns this queue
           * @throws RejectedExecutionException if array could not be resized
           */
          final void push(ForkJoinTask<?> task, ForkJoinPool pool,
                          boolean internal) {
!             int s = top, b = base, cap, m, p, room, newCap; ForkJoinTask<?>[] a;
!             if ((a = array) == null || (cap = a.length) <= 0 ||
!                 (room = (m = cap - 1) - (s - b)) < 0) { // could not resize
                  if (!internal)
                      unlockPhase();
!                 throw new RejectedExecutionException("Queue capacity exceeded");
              }
!             top = s + 1;
!             long pos = slotOffset(p = m & s);
!             if (!internal)
!                 U.putReference(a, pos, task);         // inside lock
!             else
!                 U.getAndSetReference(a, pos, task);   // fully fenced
!             if (room == 0 && (newCap = cap << 1) > 0) {
                  ForkJoinTask<?>[] newArray = null;
!                 try {                                 // resize for next time
                      newArray = new ForkJoinTask<?>[newCap];
                  } catch (OutOfMemoryError ex) {
                  }
                  if (newArray != null) {               // else throw on next push
!                     int newMask = newCap - 1;         // poll old, push to new
                      for (int k = s, j = cap; j > 0; --j, --k) {
!                         ForkJoinTask<?> u;
                          if ((u = (ForkJoinTask<?>)U.getAndSetReference(
!                                  a, slotOffset(k & m), null)) == null)
                              break;                    // lost to pollers
                          newArray[k & newMask] = u;
                      }
!                     updateArray(newArray);            // fully fenced
                  }
-                 a = null;                             // always signal
              }
-             if (!internal)
-                 unlockPhase();
-             if ((a == null || a[m & (s - 1)] == null) && pool != null)
-                 pool.signalWork(a, p);
          }
  
          /**
           * Takes next task, if one exists, in order specified by mode,
           * so acts as either local-pop or local-poll. Called only by owner.
--- 1249,57 ---
           * @param internal if caller owns this queue
           * @throws RejectedExecutionException if array could not be resized
           */
          final void push(ForkJoinTask<?> task, ForkJoinPool pool,
                          boolean internal) {
!             int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
!             if ((a = array) != null && (cap = a.length) > 0) { // else disabled
!                 if ((room = (m = cap - 1) - (s - b)) >= 0) {
+                     top = s + 1;
+                     long pos = slotOffset(m & s);
+                     if (!internal)
+                         U.putReference(a, pos, task);       // inside lock
+                     else
+                         U.getAndSetReference(a, pos, task); // fully fenced
+                     if (room == 0)                          // resize
+                         growArray(a, cap, s);
+                 }
                  if (!internal)
                      unlockPhase();
!                 if (room < 0)
+                     throw new RejectedExecutionException("Queue capacity exceeded");
+                 else if ((room == 0 || a[m & (s - 1)] == null) && pool != null)
+                     pool.signalWork();
              }
!         }
! 
!         /**
!          * Resizes the queue array unless out of memory.
!          * @param a old array
!          * @param cap old array capacity
!          * @param s current top
+          */
+         private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
+             int newCap = (cap < 1 << 24) ? cap << 2 : cap << 1;
+             if (a != null && a.length == cap && cap > 0 && newCap > 0) {
                  ForkJoinTask<?>[] newArray = null;
!                 try {
                      newArray = new ForkJoinTask<?>[newCap];
                  } catch (OutOfMemoryError ex) {
                  }
                  if (newArray != null) {               // else throw on next push
!                     int mask = cap - 1, newMask = newCap - 1;
                      for (int k = s, j = cap; j > 0; --j, --k) {
!                         ForkJoinTask<?> u;            // poll old, push to new
                          if ((u = (ForkJoinTask<?>)U.getAndSetReference(
!                                  a, slotOffset(k & mask), null)) == null)
                              break;                    // lost to pollers
                          newArray[k & newMask] = u;
                      }
!                     updateArray(newArray);           // fully fenced
                  }
              }
          }
  
          /**
           * Takes next task, if one exists, in order specified by mode,
           * so acts as either local-pop or local-poll. Called only by owner.

*** 1336,12 ***
           */
          private ForkJoinTask<?> nextLocalTask(int fifo) {
              ForkJoinTask<?> t = null;
              ForkJoinTask<?>[] a = array;
              int b = base, p = top, cap;
!             if (a != null && (cap = a.length) > 0) {
!                 for (int m = cap - 1, s, nb; p - b > 0; ) {
                      if (fifo == 0 || (nb = b + 1) == p) {
                          if ((t = (ForkJoinTask<?>)U.getAndSetReference(
                                   a, slotOffset(m & (s = p - 1)), null)) != null)
                              updateTop(s);       // else lost race for only task
                          break;
--- 1307,12 ---
           */
          private ForkJoinTask<?> nextLocalTask(int fifo) {
              ForkJoinTask<?> t = null;
              ForkJoinTask<?>[] a = array;
              int b = base, p = top, cap;
!             if (p - b > 0 && a != null && (cap = a.length) > 0) {
!                 for (int m = cap - 1, s, nb;;) {
                      if (fifo == 0 || (nb = b + 1) == p) {
                          if ((t = (ForkJoinTask<?>)U.getAndSetReference(
                                   a, slotOffset(m & (s = p - 1)), null)) != null)
                              updateTop(s);       // else lost race for only task
                          break;

*** 1353,10 ***
--- 1324,12 ---
                      }
                      while (b == (b = base)) {
                          U.loadFence();
                          Thread.onSpinWait();    // spin to reduce memory traffic
                      }
+                     if (p - b <= 0)
+                         break;
                  }
              }
              return t;
          }
  

*** 1415,86 ***
           * Polls for a task. Used only by non-owners.
           *
           * @param pool if nonnull, pool to signal if more tasks exist
           */
          final ForkJoinTask<?> poll(ForkJoinPool pool) {
!             for (;;) {
!                 ForkJoinTask<?>[] a = array;
!                 int b = base, cap, k;
!                 if (a == null || (cap = a.length) <= 0)
!                     break;
!                 ForkJoinTask<?> t = a[k = b & (cap - 1)];
!                 U.loadFence();
!                 if (base == b) {
!                     Object o;
!                     int nb = b + 1, nk = nb & (cap - 1);
!                     if (t == null)
!                         o = a[k];
!                     else if (t == (o = U.compareAndExchangeReference(
!                                        a, slotOffset(k), t, null))) {
                          updateBase(nb);
                          if (a[nk] != null && pool != null)
!                             pool.signalWork(a, nk); // propagate
                          return t;
                      }
-                     if (o == null && a[nk] == null && array == a &&
-                         (phase & (IDLE | 1)) != 0 && top - base <= 0)
-                         break;                    // empty
-                 }
-             }
-             return null;
-         }
- 
-         /**
-          * Tries to poll next task in FIFO order, failing without
-          * retries on contention or stalls. Used only by topLevelExec
-          * to repoll from the queue obtained from pool.scan.
-          */
-         private ForkJoinTask<?> tryPoll() {
-             ForkJoinTask<?>[] a; int cap;
-             if ((a = array) != null && (cap = a.length) > 0) {
-                 for (int b = base, k;;) {  // loop only if inconsistent
-                     ForkJoinTask<?> t = a[k = b & (cap - 1)];
-                     U.loadFence();
-                     if (b == (b = base)) {
-                         Object o;
-                         if (t == null)
-                             o = a[k];
-                         else if (t == (o = U.compareAndExchangeReference(
-                                            a, slotOffset(k), t, null))) {
-                             updateBase(b + 1);
-                             return t;
-                         }
-                         if (o == null)
-                             break;
-                     }
                  }
              }
              return null;
          }
  
          // specialized execution methods
  
          /**
!          * Runs the given (stolen) task if nonnull, as well as
-          * remaining local tasks and/or others available from the
-          * given queue, if any.
           */
!         final void topLevelExec(ForkJoinTask<?> task, WorkQueue src, int srcId) {
!             int cfg = config, fifo = cfg & FIFO, nstolen = nsteals + 1;
-             if ((srcId & 1) != 0) // don't record external sources
-                 source = srcId;
-             if ((cfg & CLEAR_TLS) != 0)
-                 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
              while (task != null) {
                  task.doExec();
!                 if ((task = nextLocalTask(fifo)) == null && src != null &&
-                     (task = src.tryPoll()) != null)
-                     ++nstolen;
              }
!             nsteals = nstolen;
!             forgetSource();
          }
  
          /**
           * Deep form of tryUnpush: Traverses from top and removes and
           * runs task if present.
--- 1388,55 ---
           * Polls for a task. Used only by non-owners.
           *
           * @param pool if nonnull, pool to signal if more tasks exist
           */
          final ForkJoinTask<?> poll(ForkJoinPool pool) {
!             if ((phase & (IDLE | 1)) == 0 || top - base > 0) {
!                 for (boolean stalled = false;;) {
!                     int cap, b, k, nb; ForkJoinTask<?>[] a;
!                     if ((a = array) == null || (cap = a.length) <= 0)
!                         break;
!                     long kp = slotOffset(k = (cap - 1) & (b = base));
!                     int nk = (nb = b + 1) & (cap - 1); // next slot
!                     int sk = (b + 2) & (cap - 1);      // 2nd slot ahead
!                     ForkJoinTask<?> t = a[k];
!                     U.loadFence();
!                     if (base != b)
!                         ;                              // inconsistent
!                     else if (t == null) {
!                         if (a[sk] == null && a[nk] == null && a[k] == null &&
+                             top - b <= 0)
+                             break;                     // empty
+                         if (stalled)
+                             Thread.onSpinWait();
+                         stalled = true;
+                         U.loadFence();                 // reread
+                     }
+                     else if (U.compareAndSetReference(a, kp, t, null)) {
                          updateBase(nb);
                          if (a[nk] != null && pool != null)
!                             pool.signalWork();         // propagate
                          return t;
                      }
                  }
              }
              return null;
          }
  
          // specialized execution methods
  
          /**
!          * Runs the given task, as well as remaining local tasks.
           */
!         final void topLevelExec(ForkJoinTask<?> task, int cfg) {
!             int fifo = cfg & FIFO;
              while (task != null) {
                  task.doExec();
!                 task = nextLocalTask(fifo);
              }
!             if ((cfg & CLEAR_TLS) != 0)
!                 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
          }
  
          /**
           * Deep form of tryUnpush: Traverses from top and removes and
           * runs task if present.

*** 1632,11 ***
              U = Unsafe.getUnsafe();
              Class<WorkQueue> klass = WorkQueue.class;
              PHASE = U.objectFieldOffset(klass, "phase");
              BASE = U.objectFieldOffset(klass, "base");
              TOP = U.objectFieldOffset(klass, "top");
-             SOURCE = U.objectFieldOffset(klass, "source");
              ARRAY = U.objectFieldOffset(klass, "array");
          }
      }
  
      // static fields (initialized in static initializer below)
--- 1574,10 ---

*** 1869,149 ***
              if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
                  w.source = DEREGISTERED;
                  if (phase != 0) {         // else failed to start
                      replaceable = true;
                      if ((phase & IDLE) != 0)
!                         reactivate(w);    // pool stopped before released
                  }
              }
          }
!         long c = ctl;
!         if (src != DEREGISTERED)          // decrement counts
              do {} while (c != (c = compareAndExchangeCtl(
                                     c, ((RC_MASK & (c - RC_UNIT)) |
                                         (TC_MASK & (c - TC_UNIT)) |
                                         (LMASK & c)))));
!         else if ((int)c != 0)
!             replaceable = true;           // signal below to cascade timeouts
!         if (w != null) {                  // cancel remaining tasks
!             for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
!                 try {
!                     t.cancel(false);
-                 } catch (Throwable ignore) {
                  }
              }
          }
          if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
              WorkQueue[] qs; int n, i;     // remove index unless terminating
              long ns = w.nsteals & 0xffffffffL;
!             int stop = lockRunState() & STOP;
!             if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0 &&
!                 qs[i = phase & SMASK & (n - 1)] == w) {
                  qs[i] = null;
                  stealCount += ns;         // accumulate steals
              }
              unlockRunState();
          }
-         if ((runState & STOP) == 0 && replaceable)
-             signalWork(null, 0); // may replace unless trimmed or uninitialized
          if (ex != null)
              ForkJoinTask.rethrow(ex);
      }
  
      /**
!      * Releases an idle worker, or creates one if not enough exist,
-      * returning on contention if a signal task is already taken.
-      *
-      * @param a if nonnull, a task array holding task signalled
-      * @param k index of task in array
       */
!     final void signalWork(ForkJoinTask<?>[] a, int k) {
          int pc = parallelism;
          for (long c = ctl;;) {
              WorkQueue[] qs = queues;
              long ac = (c + RC_UNIT) & RC_MASK, nc;
              int sp = (int)c, i = sp & SMASK;
!             if (qs == null || qs.length <= i)
                  break;
              WorkQueue w = qs[i], v = null;
              if (sp == 0) {
                  if ((short)(c >>> TC_SHIFT) >= pc)
                      break;
                  nc = ((c + TC_UNIT) & TC_MASK);
              }
!             else if ((short)(c >>> RC_SHIFT) >= pc || (v = w) == null)
                  break;
!             else
                  nc = (v.stackPred & LMASK) | (c & TC_MASK);
              if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
                  if (v == null)
                      createWorker();
                  else {
                      v.phase = sp;
                      if (v.parking != 0)
!                         U.unpark(v.owner);
                  }
                  break;
              }
-             if (a != null && k >= 0 && k < a.length && a[k] == null)
-                 break;
          }
      }
  
      /**
       * Reactivates the given worker, and possibly others if not top of
       * ctl stack. Called only during shutdown to ensure release on
       * termination.
       */
!     private void reactivate(WorkQueue w) {
          for (long c = ctl;;) {
              WorkQueue[] qs; WorkQueue v; int sp, i;
!             if ((qs = queues) == null || (sp = (int)c) == 0 ||
!                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null ||
-                 (v != w && w != null && (w.phase & IDLE) == 0))
                  break;
              if (c == (c = compareAndExchangeCtl(
                            c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
                                (v.stackPred & LMASK))))) {
                  v.phase = sp;
!                 if (v == w)
-                     break;
-                 if (v.parking != 0)
-                     U.unpark(v.owner);
              }
          }
      }
  
      /**
       * Internal version of isQuiescent and related functionality.
!      * @return true if terminating or all workers are inactive and
!      * submission queues are empty and unlocked; if so, setting STOP
!      * if shutdown is enabled
       */
!     private boolean quiescent() {
          outer: for (;;) {
              long phaseSum = 0L;
              boolean swept = false;
              for (int e, prevRunState = 0; ; prevRunState = e) {
                  long c = ctl;
                  if (((e = runState) & STOP) != 0)
!                     return true;                          // terminating
                  else if ((c & RC_MASK) > 0L)
!                     return false;                         // at least one active
                  else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
                      long sum = c;
                      WorkQueue[] qs = queues; WorkQueue q;
                      int n = (qs == null) ? 0 : qs.length;
                      for (int i = 0; i < n; ++i) {         // scan queues
                          if ((q = qs[i]) != null) {
                              int p = q.phase, s = q.top, b = q.base;
                              sum += (p & 0xffffffffL) | ((long)b << 32);
                              if ((p & IDLE) == 0 || s - b > 0) {
                                  if ((i & 1) == 0 && compareAndSetCtl(c, c))
!                                     signalWork(null, 0);  // ensure live
!                                 return false;
                              }
                          }
                      }
                      swept = (phaseSum == (phaseSum = sum));
                  }
                  else if ((e & SHUTDOWN) == 0)
!                     return true;
                  else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
!                     interruptAll();                       // confirmed
!                     return true;                          // enable termination
                  }
                  else
                      break;                                // restart
              }
          }
--- 1810,146 ---
              if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
                  w.source = DEREGISTERED;
                  if (phase != 0) {         // else failed to start
                      replaceable = true;
                      if ((phase & IDLE) != 0)
!                         releaseAll();     // pool stopped before released
                  }
              }
          }
!         if (src != DEREGISTERED) {        // decrement counts
!             long c = ctl;
              do {} while (c != (c = compareAndExchangeCtl(
                                     c, ((RC_MASK & (c - RC_UNIT)) |
                                         (TC_MASK & (c - TC_UNIT)) |
                                         (LMASK & c)))));
!             if (w != null && w.top - w.base > 0) {
!                 for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
!                     try {                 // cancel remaining tasks
!                         t.cancel(false);
!                     } catch (Throwable ignore) {
!                     }
                  }
              }
          }
          if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
              WorkQueue[] qs; int n, i;     // remove index unless terminating
              long ns = w.nsteals & 0xffffffffL;
!             if ((lockRunState() & STOP) != 0)
!                 replaceable = false;
!             else if ((qs = queues) != null && (n = qs.length) > 0 &&
+                      qs[i = phase & SMASK & (n - 1)] == w) {
                  qs[i] = null;
                  stealCount += ns;         // accumulate steals
              }
              unlockRunState();
+             if (replaceable)
+                 signalWork();
          }
          if (ex != null)
              ForkJoinTask.rethrow(ex);
      }
  
      /**
!      * Releases an idle worker, or creates one if not enough exist.
       */
!     final void signalWork() {
          int pc = parallelism;
          for (long c = ctl;;) {
              WorkQueue[] qs = queues;
              long ac = (c + RC_UNIT) & RC_MASK, nc;
              int sp = (int)c, i = sp & SMASK;
!             if ((short)(c >>> RC_SHIFT) >= pc)
+                 break;
+             if (qs == null)
+                 break;
+             if (qs.length <= i)
                  break;
              WorkQueue w = qs[i], v = null;
+             Thread t = null;
              if (sp == 0) {
                  if ((short)(c >>> TC_SHIFT) >= pc)
                      break;
                  nc = ((c + TC_UNIT) & TC_MASK);
              }
!             else if ((v = w) == null)
                  break;
!             else {
+                 t = v.owner;
                  nc = (v.stackPred & LMASK) | (c & TC_MASK);
+             }
              if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
                  if (v == null)
                      createWorker();
                  else {
                      v.phase = sp;
                      if (v.parking != 0)
!                         U.unpark(t);
                  }
                  break;
              }
          }
      }
  
      /**
       * Reactivates the given worker, and possibly others if not top of
       * ctl stack. Called only during shutdown to ensure release on
       * termination.
       */
!     private void releaseAll() {
          for (long c = ctl;;) {
              WorkQueue[] qs; WorkQueue v; int sp, i;
!             if ((sp = (int)c) == 0 || (qs = queues) == null ||
!                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
                  break;
              if (c == (c = compareAndExchangeCtl(
                            c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
                                (v.stackPred & LMASK))))) {
                  v.phase = sp;
!                 U.unpark(v.owner);
              }
          }
      }
  
      /**
       * Internal version of isQuiescent and related functionality.
!      * @return positive if stopping, nonnegative if terminating or all
!      * workers are inactive and submission queues are empty and
!      * unlocked; if so, setting STOP if shutdown is enabled
       */
!     private int quiescent() {
          outer: for (;;) {
              long phaseSum = 0L;
              boolean swept = false;
              for (int e, prevRunState = 0; ; prevRunState = e) {
                  long c = ctl;
                  if (((e = runState) & STOP) != 0)
!                     return 1;                         // terminating
                  else if ((c & RC_MASK) > 0L)
!                     return -1;                        // at least one active
                  else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
                      long sum = c;
                      WorkQueue[] qs = queues; WorkQueue q;
                      int n = (qs == null) ? 0 : qs.length;
                      for (int i = 0; i < n; ++i) {         // scan queues
                          if ((q = qs[i]) != null) {
                              int p = q.phase, s = q.top, b = q.base;
                              sum += (p & 0xffffffffL) | ((long)b << 32);
                              if ((p & IDLE) == 0 || s - b > 0) {
                                  if ((i & 1) == 0 && compareAndSetCtl(c, c))
!                                     signalWork();
!                                 return -1;
                              }
                          }
                      }
                      swept = (phaseSum == (phaseSum = sum));
                  }
                  else if ((e & SHUTDOWN) == 0)
!                     return 0;
                  else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
!                     releaseAll();                         // confirmed
!                     return 1;                             // enable termination
                  }
                  else
                      break;                                // restart
              }
          }

*** 2022,151 ***
       * See above for explanation.
       *
       * @param w caller's WorkQueue (may be null on failed initialization)
       */
      final void runWorker(WorkQueue w) {
!         if (w != null) {
!             int phase = w.phase, r = w.stackPred; // seed from registerWorker
-             long window = (long)((r >>> 16) & SMASK) | NO_HISTORY;
              do {
                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
!             } while ((runState & STOP) == 0 &&
!                      (((window = scan(w, window, r)) < 0L ||
!                        ((phase = awaitWork(w, phase)) & IDLE) == 0)));
          }
      }
  
      /**
       * Scans for and if found executes top-level tasks: Tries to poll
!      * each queue starting at initial index with random stride,
!      * returning next scan window and retry indicator.
       *
       * @param w caller's WorkQueue
-      * @param window up to three queue indices
       * @param r random seed
!      * @return the next window to use, with RESCAN set for rescan
       */
!     private long scan(WorkQueue w, long window, int r) {
!         WorkQueue[] qs = queues;
!         int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1;
!         long next = window & ~RESCAN;
!         outer: for (int i = (short)window, l = n; l > 0; --l, i += step) {
!             int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
!             if ((q = qs[j = i & SMASK & (n - 1)]) != null &&
!                 (a = q.array) != null && (cap = a.length) > 0) {
!                 for (int b = q.base;;) {
!                     int nb = b + 1, nk = nb & (cap - 1), k;
!                     ForkJoinTask<?> t = a[k = b & (cap - 1)];
-                     U.loadFence();                // re-read b and t
-                     if (b == (b = q.base)) {      // else inconsistent; retry
-                         Object o;
-                         if (t == null)
-                             o = a[k];
-                         else if (t == (o = U.compareAndExchangeReference(
-                                            a, slotOffset(k), t, null))) {
-                             q.updateBase(nb);
-                             next = RESCAN | ((window << 16) & HMASK) | j;
-                             if (window != next && a[nk] != null)
-                                 signalWork(a, nk); // limit propagation
-                             if (w != null)        // always true
-                                 w.topLevelExec(t, q, j);
-                             break outer;
-                         }
-                         if (o == null) {
-                             if (next >= 0L && a[nk] != null)
-                                 next |= RESCAN;
                              break;
                          }
                      }
                  }
              }
          }
!         return next;
      }
  
      /**
!      * Tries to inactivate, and if successful, awaits signal or termination.
       *
       * @param w the worker (may be null if already terminated)
       * @param phase current phase
!      * @return current phase, with IDLE set if worker should exit
!      */
!     private int awaitWork(WorkQueue w, int phase) {
!         boolean quiet;                           // true if possibly quiescent
!         int active = phase + (IDLE << 1), p = phase | IDLE, e;
!         if (w != null) {
!             w.phase = p;                         // deactivate
!             long np = active & LMASK, pc = ctl;  // try to enqueue
-             long qc = np | ((pc - RC_UNIT) & UMASK);
              w.stackPred = (int)pc;               // set ctl stack link
!             if (pc != (pc = compareAndExchangeCtl(pc, qc))) {
!                 qc = np | ((pc - RC_UNIT) & UMASK);
!                 w.stackPred = (int)pc;           // retry once
!                 if (pc != (pc = compareAndExchangeCtl(pc, qc)))
-                     p = w.phase = phase;         // back out
-             }
-             if (p != phase && ((e = runState) & STOP) == 0 &&
-                 (!(quiet = (qc & RC_MASK) <= 0L) || (e & SHUTDOWN) == 0 ||
-                  !(quiet = quiescent()) || (runState & STOP) == 0)) {
-                 long deadline = 0L;              // not terminating
-                 if (quiet) {                     // use timeout if trimmable
-                     int nt = (short)(qc >>> TC_SHIFT);
-                     long delay = keepAlive;      // scale if not at target
-                     if (nt != (nt = Math.max(nt, parallelism)) && nt > 0)
-                         delay = Math.max(TIMEOUT_SLOP, delay / nt);
-                     if ((deadline = delay + System.currentTimeMillis()) == 0L)
-                         deadline = 1L;           // avoid zero
-                 }
-                 boolean release = quiet;
                  WorkQueue[] qs = queues;         // recheck queues
                  int n = (qs == null) ? 0 : qs.length;
                  for (int l = -n, j = active; l < n; ++l, ++j) {
!                     WorkQueue q; ForkJoinTask<?>[] a; int cap;
!                     if ((p = w.phase) == active) // interleave signal checks
!                         break;
!                     if ((q = qs[j & (n - 1)]) != null &&
!                         (a = q.array) != null && (cap = a.length) > 0 &&
!                         a[q.base & (cap - 1)] != null) {
!                         if (release && qc == ctl && compareAndSetCtl(qc, pc)) {
                              p = w.phase = active;
                              break;               // possible missed signal
                          }
-                         release = true;          // track multiple or reencounter
                      }
                      Thread.onSpinWait();         // reduce memory traffic
                  }
-                 if (p != active) {               // emulate LockSupport.park
-                     LockSupport.setCurrentBlocker(this);
-                     w.parking = 1;
-                     for (;;) {
-                         if ((runState & STOP) != 0 || (p = w.phase) == active)
-                             break;
-                         U.park(deadline != 0L, deadline);
-                         if ((p = w.phase) == active || (runState & STOP) != 0)
-                             break;
-                         Thread.interrupted();    // clear for next park
-                         if (deadline != 0L && TIMEOUT_SLOP >
-                             deadline - System.currentTimeMillis()) {
-                             long sp = w.stackPred & LMASK, c = ctl;
-                             long nc = sp | (UMASK & (c - TC_UNIT));
-                             if (((int)c & SMASK) == (active & SMASK) &&
-                                 compareAndSetCtl(c, nc)) {
-                                 w.source = DEREGISTERED;
-                                 w.phase = active;
-                                 break;           // trimmed on timeout
-                             }
-                             deadline = 0L;       // no longer trimmable
-                         }
-                     }
-                     w.parking = 0;
-                     LockSupport.setCurrentBlocker(null);
-                 }
              }
          }
          return p;
      }
  
      /**
       * Scans for and returns a polled task, if available.  Used only
       * for untracked polls. Begins scan at a random index to avoid
       * systematic unfairness.
       *
--- 1960,188 ---
       * See above for explanation.
       *
       * @param w caller's WorkQueue (may be null on failed initialization)
       */
      final void runWorker(WorkQueue w) {
!         if (w != null) {              // use seed from registerWorker
!             int phase = w.phase, r = w.stackPred, cfg = w.config, stop;
              do {
+                 stop = runState & STOP;
                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
!             } while (stop == 0 &&
!                      (scan(w, r, cfg) ||
!                       ((phase = deactivate(w, phase)) & IDLE) == 0 ||
+                       ((phase = awaitWork(w, phase)) & IDLE) == 0));
          }
      }
  
      /**
       * Scans for and if found executes top-level tasks: Tries to poll
!      * each queue starting at random index with random stride,
!      * returning retry indicator.
       *
       * @param w caller's WorkQueue
       * @param r random seed
!      * @param cfg config bits
+      * @return true for rescan
       */
!     private boolean scan(WorkQueue w, int r, int cfg) {
!         WorkQueue[] qs; int n;
!         boolean rescan = false;
!         if ((qs = queues) != null && (n = qs.length) > 0 && w != null) {
!             int i = r, step = (r >>> 16) | 1, nstolen = 0;
!             sweep: for (int l = n; l > 0; --l, i += step) {
!                 int j; WorkQueue q;
!                 if ((q = qs[j = i & (n - 1)]) != null) {
!                     for (int retries = 0;;) {
!                         int cap, b, k, nb; ForkJoinTask<?>[] a;
!                         if ((a = q.array) == null || (cap = a.length) <= 0)
                              break;
+                         long kp = slotOffset(k = (cap - 1) & (b = q.base));
+                         int nk = (nb = b + 1) & (cap - 1); // next slot
+                         int sk = (b + 2) & (cap - 1);      // 2nd slot ahead
+                         ForkJoinTask<?> t = a[k];
+                         U.loadFence();
+                         if (q.base != b)
+                             ;                             // inconsistent
+                         else if (t == null) {
+                             if (a[sk] == null && a[nk] == null && a[k] == null)
+                                 break;                    // probably empty
+                             U.loadFence();                // reread
+                         }
+                         else if (U.compareAndSetReference(a, kp, t, null)) {
+                             q.base = nb;
+                             w.source = j;                 // volatile write
+                             rescan = true;
+                             if (nstolen++ == 0 && a[nk] != null)
+                                 signalWork();             // propagate signal
+                             w.topLevelExec(t, cfg);
+                             if (q.base == nb)
+                                 continue;                 // no other pollers
+                         }
+                         if (++retries >= MAX_SCAN_RETRIES) {
+                             rescan = true;
+                             break sweep;                  // randomly move
                          }
                      }
                  }
              }
+             if (nstolen != 0)
+                 w.nsteals += nstolen;
          }
!         return rescan;
      }
  
      /**
!      * Deactivates after failing to find work; reactivates on ctl
+      * contention, signal, or possible missed signal
       *
       * @param w the worker (may be null if already terminated)
       * @param phase current phase
!      * @return current phase, or IDLE for exit
!      */
!     private int deactivate(WorkQueue w, int phase) {
!         int p = IDLE;
!         if ((runState & STOP) == 0 && w != null) {
!             int active = phase + (IDLE << 1);
!             w.phase = phase | IDLE;
!             long pc = ctl, qc = (active & LMASK) | ((pc - RC_UNIT) & UMASK);
              w.stackPred = (int)pc;               // set ctl stack link
!             if (!compareAndSetCtl(pc, qc))       // try to enqueue
!                 p = w.phase = phase;             // back out on contention
!             else if ((qc & RC_MASK) > 0L || quiescent() <= 0) {
!                 boolean release = false;
                  WorkQueue[] qs = queues;         // recheck queues
                  int n = (qs == null) ? 0 : qs.length;
                  for (int l = -n, j = active; l < n; ++l, ++j) {
!                     WorkQueue q = qs[j & (n - 1)];
!                     if (((p = w.phase) & IDLE) == 0)
!                         break;                   // interleave signal checks
!                     if (q != null && q.top - q.base > 0) {
!                         if (!release)            // need multiple or reencounter
!                             release = true;
!                         else if (ctl == qc && compareAndSetCtl(qc, pc)) {
                              p = w.phase = active;
                              break;               // possible missed signal
                          }
                      }
                      Thread.onSpinWait();         // reduce memory traffic
                  }
              }
          }
          return p;
      }
  
+     /**
+      * Awaits signal or termination.
+      *
+      * @param w the worker (may be null if already terminated)
+      * @param phase current phase or IDLE if already exiting
+      * @return current phase, with IDLE set if worker should exit
+      */
+     private int awaitWork(WorkQueue w, int phase) {
+         if (phase != IDLE && w != null) {
+             long deadline = 0L, c;       // Use timeout if quiescent
+             long d = (w.source == INVALID_ID) ? TIMEOUT_SLOP : keepAlive;
+             if (((c = ctl) & RC_MASK) <= 0L && (int)c == phase + IDLE &&
+                 (deadline = d + System.currentTimeMillis()) == 0L)
+                 deadline = 1L;           // avoid zero
+             LockSupport.setCurrentBlocker(this);
+             w.parking = 1;               // enable unpark
+             for (;;) {
+                 if ((runState & STOP) != 0)
+                     break;
+                 if (((phase = w.phase) & IDLE) == 0)
+                     break;
+                 U.park(deadline != 0L, deadline);
+                 if (((phase = w.phase) & IDLE) == 0)
+                     break;
+                 if ((runState & STOP) != 0)
+                     break;
+                 Thread.interrupted();    // clear for next park
+                 if (deadline != 0L && TIMEOUT_SLOP >
+                     deadline - System.currentTimeMillis()) {
+                     if (tryTrim(w))
+                         break;
+                     deadline = 0L;       // no longer trimmable
+                 }
+             }
+             w.parking = 0;
+             LockSupport.setCurrentBlocker(null);
+         }
+         return phase;
+     }
+ 
+     /**
+      * Tries to remove and deregister worker after timeout, and release
+      * others to do the same. Fails if was instead signalled first.
+      */
+     private boolean tryTrim(WorkQueue w) {
+         int phase;
+         if (w != null && ((phase = w.phase) & IDLE) != 0) {
+             long sp = w.stackPred & LMASK, c = ctl;
+             long nc = sp | (UMASK & (c - TC_UNIT));
+             int active = phase + IDLE;
+             if ((int)c == active && compareAndSetCtl(c, nc)) {
+                 WorkQueue[] vs; WorkQueue v; int vp, i;
+                 w.source = DEREGISTERED;
+                 w.phase = active;          // try to wake up next waiter
+                 if ((vp = (int)nc) != 0 && (vs = queues) != null &&
+                     vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
+                     compareAndSetCtl(
+                         nc, ((UMASK & (nc + RC_UNIT)) |
+                              (nc & TC_MASK) | (v.stackPred & LMASK)))) {
+                     v.source = INVALID_ID; // enable cascaded timeouts
+                     v.phase = vp;
+                     U.unpark(v.owner);
+                 }
+                 return true;
+             }
+         }
+         return false;
+     }
+ 
      /**
       * Scans for and returns a polled task, if available.  Used only
       * for untracked polls. Begins scan at a random index to avoid
       * systematic unfairness.
       *

*** 2215,12 ***
              WorkQueue[] qs; WorkQueue v; int i;
              if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
                  (v = qs[i]) != null &&
                  compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
                  v.phase = sp;
!                 if (v.parking != 0)
-                     U.unpark(v.owner);
                  stat = UNCOMPENSATE;
              }
          }
          else if (active > minActive && total >= pc) {   // reduce active workers
              if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
--- 2190,11 ---
              WorkQueue[] qs; WorkQueue v; int i;
              if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
                  (v = qs[i]) != null &&
                  compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
                  v.phase = sp;
!                 U.unpark(v.owner);
                  stat = UNCOMPENSATE;
              }
          }
          else if (active > minActive && total >= pc) {   // reduce active workers
              if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))

*** 2259,11 ***
       * @param task the task
       * @param w caller's WorkQueue
       * @param internal true if w is owned by a ForkJoinWorkerThread
       * @return task status on exit, or UNCOMPENSATE for compensated blocking
       */
- 
      final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
          if (w != null)
              w.tryRemoveAndExec(task, internal);
          int s = 0;
          if (task != null && (s = task.status) >= 0 && internal && w != null) {
--- 2233,10 ---

*** 2516,22 ***
       * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
       * @param interruptible true if return on interrupt
       * @return positive if quiescent, negative if interrupted, else 0
       */
      private int externalHelpQuiesce(long nanos, boolean interruptible) {
!         if (!quiescent()) {
              long startTime = System.nanoTime();
              long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
              for (int waits = 0;;) {
                  ForkJoinTask<?> t;
                  if (interruptible && Thread.interrupted())
                      return -1;
                  else if ((t = pollScan(false)) != null) {
                      waits = 0;
                      t.doExec();
                  }
!                 else if (quiescent())
                      break;
                  else if (System.nanoTime() - startTime > nanos)
                      return 0;
                  else if (waits == 0)
                      waits = MIN_SLEEP;
--- 2489,22 ---
       * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
       * @param interruptible true if return on interrupt
       * @return positive if quiescent, negative if interrupted, else 0
       */
      private int externalHelpQuiesce(long nanos, boolean interruptible) {
!         if (quiescent() < 0) {
              long startTime = System.nanoTime();
              long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
              for (int waits = 0;;) {
                  ForkJoinTask<?> t;
                  if (interruptible && Thread.interrupted())
                      return -1;
                  else if ((t = pollScan(false)) != null) {
                      waits = 0;
                      t.doExec();
                  }
!                 else if (quiescent() >= 0)
                      break;
                  else if (System.nanoTime() - startTime > nanos)
                      return 0;
                  else if (waits == 0)
                      waits = MIN_SLEEP;

*** 2752,50 ***
       * if no work and no active workers
       * @param enable if true, terminate when next possible
       * @return runState on exit
       */
      private int tryTerminate(boolean now, boolean enable) {
!         int e = runState;
          if ((e & STOP) == 0) {
              if (now) {
                  int s = lockRunState();
                  runState = e = (s + RS_LOCK) | STOP | SHUTDOWN;
                  if ((s & STOP) == 0)
                      interruptAll();
              }
!             else {
!                 int isShutdown = (e & SHUTDOWN);
!                 if (isShutdown == 0 && enable)
!                     getAndBitwiseOrRunState(isShutdown = SHUTDOWN);
!                 if (isShutdown != 0)
-                     quiescent();                 // may trigger STOP
-                 e = runState;
              }
          }
          if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
!             int r = (int)Thread.currentThread().threadId(); // stagger traversals
!             WorkQueue[] qs = queues;
!             int n = (qs == null) ? 0 : qs.length;
!             for (int l = n; l > 0; --l, ++r) {
!                 int j = r & SMASK & (n - 1); WorkQueue q; ForkJoinTask<?> t;
!                 while ((q = qs[j]) != null && q.source != DEREGISTERED &&
!                        (t = q.poll(null)) != null) {
!                     try {
!                         t.cancel(false);
!                     } catch (Throwable ignore) {
                      }
                  }
              }
              if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
                  if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
                      CountDownLatch done; SharedThreadContainer ctr;
                      if ((done = termination) != null)
                          done.countDown();
                      if ((ctr = container) != null)
                          ctr.close();
                  }
-                 e = runState;
              }
          }
          return e;
      }
  
--- 2725,52 ---
       * if no work and no active workers
       * @param enable if true, terminate when next possible
       * @return runState on exit
       */
      private int tryTerminate(boolean now, boolean enable) {
!         int e = runState, isShutdown;
          if ((e & STOP) == 0) {
              if (now) {
                  int s = lockRunState();
                  runState = e = (s + RS_LOCK) | STOP | SHUTDOWN;
                  if ((s & STOP) == 0)
                      interruptAll();
              }
!             else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) {
!                 if (isShutdown == 0)
!                     getAndBitwiseOrRunState(SHUTDOWN);
!                 if (quiescent() > 0)
!                     e = runState;
              }
          }
          if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
!             if ((ctl & RC_MASK) > 0L) {          // unless was quiescent
!                 int r = (int)Thread.currentThread().threadId();
!                 WorkQueue[] qs = queues;         // stagger traversals
!                 int n = (qs == null) ? 0 : qs.length;
!                 for (int l = n; l > 0; --l, ++r) {
!                     WorkQueue q; ForkJoinTask<?> t;
!                     if ((q = qs[r & (n - 1)]) != null &&
!                         q.source != DEREGISTERED) {
!                         while ((t = q.poll(null)) != null) {
!                             try {
+                                 t.cancel(false);
+                             } catch (Throwable ignore) {
+                             }
+                         }
                      }
                  }
              }
              if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
+                 e |= TERMINATED;
                  if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
                      CountDownLatch done; SharedThreadContainer ctr;
                      if ((done = termination) != null)
                          done.countDown();
                      if ((ctr = container) != null)
                          ctr.close();
                  }
              }
          }
          return e;
      }
  

*** 3499,11 ***
       * threads remain inactive.
       *
       * @return {@code true} if all threads are currently idle
       */
      public boolean isQuiescent() {
!         return quiescent();
      }
  
      /**
       * Returns an estimate of the total number of completed tasks that
       * were executed by a thread other than their submitter. The
--- 3474,11 ---
       * threads remain inactive.
       *
       * @return {@code true} if all threads are currently idle
       */
      public boolean isQuiescent() {
!         return quiescent() >= 0;
      }
  
      /**
       * Returns an estimate of the total number of completed tasks that
       * were executed by a thread other than their submitter. The

*** 3986,19 ***
--- 3961,21 ---
      /**
       * Invokes tryCompensate to create or re-activate a spare thread to
       * compensate for a thread that performs a blocking operation. When the
       * blocking operation is done then endCompensatedBlock must be invoked
       * with the value returned by this method to re-adjust the parallelism.
+      * @return value to use in endCompensatedBlock
       */
      final long beginCompensatedBlock() {
          int c;
          do {} while ((c = tryCompensate(ctl)) < 0);
          return (c == 0) ? 0L : RC_UNIT;
      }
  
      /**
       * Re-adjusts parallelism after a blocking operation completes.
+      * @param post value from beginCompensatedBlock
       */
      void endCompensatedBlock(long post) {
          if (post > 0L) {
              getAndAddCtl(post);
          }
< prev index next >