< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page
*** 558,93 ***
       * required, we reduce write contention by ensuring that
       * signalWork invocations are prefaced with a fully fenced memory
       * access (which is usually needed anyway).
       *
       * Signalling. Signals (in signalWork) cause new or reactivated
!      * workers to scan for tasks.  Method signalWork and its callers
!      * try to approximate the unattainable goal of having the right
!      * number of workers activated for the tasks at hand, but must err
!      * on the side of too many workers vs too few to avoid stalls:
!      *
!      *  * If computations are purely tree structured, it suffices for
!      *    every worker to activate another when it pushes a task into
!      *    an empty queue, resulting in O(log(#threads)) steps to full
!      *    activation. Emptiness must be conservatively approximated,
!      *    which may result in unnecessary signals.  Also, to reduce
!      *    resource usages in some cases, at the expense of slower
!      *    startup in others, activation of an idle thread is preferred
!      *    over creating a new one, here and elsewhere.
!      *
!      *  * At the other extreme, if "flat" tasks (those that do not in
!      *    turn generate others) come in serially from only a single
!      *    producer, each worker taking a task from a queue should
-      *    propagate a signal if there are more tasks in that
-      *    queue. This is equivalent to, but generally faster than,
-      *    arranging the stealer take multiple tasks, re-pushing one or
-      *    more on its own queue, and signalling (because its queue is
-      *    empty), also resulting in logarithmic full activation
-      *    time. If tasks do not not engage in unbounded loops based on
-      *    the actions of other workers with unknown dependencies loop,
-      *    this form of proagation can be limited to one signal per
-      *    activation (phase change). We distinguish the cases by
-      *    further signalling only if the task is an InterruptibleTask
-      *    (see below), which are the only supported forms of task that
-      *    may do so.
-      *
-      * * Because we don't know about usage patterns (or most commonly,
-      *    mixtures), we use both approaches, which present even more
-      *    opportunities to over-signal. (Failure to distinguish these
-      *    cases in terms of submission methods was arguably an early
-      *    design mistake.)  Note that in either of these contexts,
-      *    signals may be (and often are) unnecessary because active
-      *    workers continue scanning after running tasks without the
-      *    need to be signalled (which is one reason work stealing is
-      *    often faster than alternatives), so additional workers
-      *    aren't needed.
-      *
-      * * For rapidly branching tasks that require full pool resources,
-      *   oversignalling is OK, because signalWork will soon have no
-      *   more workers to create or reactivate. But for others (mainly
-      *   externally submitted tasks), overprovisioning may cause very
-      *   noticeable slowdowns due to contention and resource
-      *   wastage. We reduce impact by deactivating workers when
-      *   queues don't have accessible tasks, but reactivating and
-      *   rescanning if other tasks remain.
-      *
-      * * Despite these, signal contention and overhead effects still
-      *   occur during ramp-up and ramp-down of small computations.
       *
       * Scanning. Method runWorker performs top-level scanning for (and
       * execution of) tasks by polling a pseudo-random permutation of
       * the array (by starting at a given index, and using a constant
       * cyclically exhaustive stride.)  It uses the same basic polling
       * method as WorkQueue.poll(), but restarts with a different
!      * permutation on each invocation.  The pseudorandom generator
!      * need not have high-quality statistical properties in the long
       * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
!      * from ThreadLocalRandom probes, which are cheap and
-      * suffice. Each queue's polling attempts to avoid becoming stuck
-      * when other scanners/pollers stall.  Scans do not otherwise
-      * explicitly take into account core affinities, loads, cache
-      * localities, etc, However, they do exploit temporal locality
-      * (which usually approximates these) by preferring to re-poll
-      * from the same queue after a successful poll before trying
-      * others, which also reduces bookkeeping, cache traffic, and
-      * scanning overhead. But it also reduces fairness, which is
-      * partially counteracted by giving up on detected interference
-      * (which also reduces contention when too many workers try to
-      * take small tasks from the same queue).
       *
       * Deactivation. When no tasks are found by a worker in runWorker,
!      * it tries to deactivate()), giving up (and rescanning) on "ctl"
!      * contention. To avoid missed signals during deactivation, the
!      * method rescans and reactivates if there may have been a missed
!      * signal during deactivation. To reduce false-alarm reactivations
!      * while doing so, we scan multiple times (analogously to method
!      * quiescent()) before trying to reactivate.  Because idle workers
!      * are often not yet blocked (parked), we use a WorkQueue field to
!      * advertise that a waiter actually needs unparking upon signal.
       *
       * Quiescence. Workers scan looking for work, giving up when they
       * don't find any, without being sure that none are available.
       * However, some required functionality relies on consensus about
       * quiescence (also termination, discussed below). The count
--- 558,74 ---
       * required, we reduce write contention by ensuring that
       * signalWork invocations are prefaced with a fully fenced memory
       * access (which is usually needed anyway).
       *
       * Signalling. Signals (in signalWork) cause new or reactivated
!      * workers to scan for tasks.  SignalWork is invoked in two cases:
!      * (1) When a task is pushed onto an empty queue, and (2) When a
!      * worker takes a top-level task from a queue that has additional
!      * tasks. Together, these suffice in O(log(#threads)) steps to
!      * fully activate with at least enough workers, and ideally no
!      * more than required.  This ideal is unobtainable: Callers do not
!      * know whether another worker will finish its current task and
!      * poll for others without need of a signal (which is otherwise an
!      * advantage of work-stealing vs other schemes), and also must
!      * conservatively estimate the triggering conditions of emptiness
!      * or non-emptiness; all of which usually cause more activations
!      * than necessary (see below). (Method signalWork is also used as
!      * failsafe in case of Thread failures in deregisterWorker, to
!      * activate or create a new worker to replace them).
!      *
!      * Top-Level scheduling
!      * ====================
       *
       * Scanning. Method runWorker performs top-level scanning for (and
       * execution of) tasks by polling a pseudo-random permutation of
       * the array (by starting at a given index, and using a constant
       * cyclically exhaustive stride.)  It uses the same basic polling
       * method as WorkQueue.poll(), but restarts with a different
!      * permutation on each rescan.  The pseudorandom generator need
!      * not have high-quality statistical properties in the long
       * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
!      * from ThreadLocalRandom probes, which are cheap and suffice.
       *
       * Deactivation. When no tasks are found by a worker in runWorker,
!      * it invokes awaitWork, that first deactivates (to an IDLE
!      * phase).  Avoiding missed signals during deactivation requires a
!      * (conservative) rescan, reactivating if there may be tasks to
!      * poll. Because idle workers are often not yet blocked (parked),
!      * we use a WorkQueue field to advertise that a waiter actually
!      * needs unparking upon signal.
!      *
!      * When tasks are constructed as (recursive) DAGs, top-level
+      * scanning is usually infrequent, and doesn't encounter most
+      * of the following problems addressed by runWorker and awaitWork:
+      *
+      * Locality. Polls are organized into "runs", continuing until
+      * empty or contended, while also minimizing interference by
+      * postponing bookeeping to ends of runs. This may reduce
+      * fairness.
+      *
+      * Contention. When many workers try to poll few queues, they
+      * often collide, generating CAS failures and disrupting locality
+      * of workers already running their tasks. This also leads to
+      * stalls when tasks cannot be taken because other workers have
+      * not finished poll operations, which is detected by reading
+      * ahead in queue arrays. In both cases, workers restart scans in a
+      * way that approximates randomized backoff.
+      *
+      * Oversignalling. When many short top-level tasks are present in
+      * a small number of queues, the above signalling strategy may
+      * activate many more workers than needed, worsening locality and
+      * contention problems, while also generating more global
+      * contention (field ctl is CASed on every activation and
+      * deactivation). We filter out (both in runWorker and
+      * signalWork) attempted signals that are surely not needed
+      * because the signalled tasks are already taken.
+      *
+      * Shutdown and Quiescence
+      * =======================
       *
       * Quiescence. Workers scan looking for work, giving up when they
       * don't find any, without being sure that none are available.
       * However, some required functionality relies on consensus about
       * quiescence (also termination, discussed below). The count

*** 890,13 ***
       * InterruptibleTasks include a "runner" field (implemented
       * similarly to FutureTask) to support cancel(true).  Upon pool
       * shutdown, runners are interrupted so they can cancel. Since
       * external joining callers never run these tasks, they must await
       * cancellation by others, which can occur along several different
!      * paths. The inability to rely on caller-runs may also require
-      * extra signalling (resulting in scanning and contention) so is
-      * done only conditionally in methods push and runworker.
       *
       * Across these APIs, rules for reporting exceptions for tasks
       * with results accessed via join() differ from those via get(),
       * which differ from those invoked using pool submit methods by
       * non-workers (which comply with Future.get() specs). Internal
--- 871,11 ---
       * InterruptibleTasks include a "runner" field (implemented
       * similarly to FutureTask) to support cancel(true).  Upon pool
       * shutdown, runners are interrupted so they can cancel. Since
       * external joining callers never run these tasks, they must await
       * cancellation by others, which can occur along several different
!      * paths.
       *
       * Across these APIs, rules for reporting exceptions for tasks
       * with results accessed via join() differ from those via get(),
       * which differ from those invoked using pool submit methods by
       * non-workers (which comply with Future.get() specs). Internal

*** 959,28 ***
       * would be, while also reducing total footprint vs using
       * multiple @Contended regions, which tends to slow down
       * less-contended applications. To help arrange this, some
       * non-reference fields are declared as "long" even when ints or
       * shorts would suffice.  For class WorkQueue, an
!      * embedded @Contended region segregates fields most heavily
!      * updated by owners from those most commonly read by stealers or
!      * other management.
       *
       * Initial sizing and resizing of WorkQueue arrays is an even more
       * delicate tradeoff because the best strategy systematically
       * varies across garbage collectors. Small arrays are better for
       * locality and reduce GC scan time, but large arrays reduce both
       * direct false-sharing and indirect cases due to GC bookkeeping
       * (cardmarks etc), and reduce the number of resizes, which are
       * not especially fast because they require atomic transfers.
!      * Currently, arrays for workers are initialized to be just large
!      * enough to avoid resizing in most tree-structured tasks, but
!      * larger for external queues where both false-sharing problems
!      * and the need for resizing are more common. (Maintenance note:
!      * any changes in fields, queues, or their uses, or JVM layout
-      * policies, must be accompanied by re-evaluation of these
-      * placement and sizing decisions.)
       *
       * Style notes
       * ===========
       *
       * Memory ordering relies mainly on atomic operations (CAS,
--- 938,30 ---
       * would be, while also reducing total footprint vs using
       * multiple @Contended regions, which tends to slow down
       * less-contended applications. To help arrange this, some
       * non-reference fields are declared as "long" even when ints or
       * shorts would suffice.  For class WorkQueue, an
!      * embedded @Contended isolates the very busy top index, and
!      * another segregates status and bookkeeping fields written
!      * (mostly) by owners, that otherwise interfere with reading
+      * array, top, and base fields. There are other variables commonly
+      * contributing to false-sharing-related performance issues
+      * (including fields of class Thread), but we can't do much about
+      * this except try to minimize access.
       *
       * Initial sizing and resizing of WorkQueue arrays is an even more
       * delicate tradeoff because the best strategy systematically
       * varies across garbage collectors. Small arrays are better for
       * locality and reduce GC scan time, but large arrays reduce both
       * direct false-sharing and indirect cases due to GC bookkeeping
       * (cardmarks etc), and reduce the number of resizes, which are
       * not especially fast because they require atomic transfers.
!      * Currently, arrays are initialized to be just large enough to
!      * avoid resizing in most tree-structured tasks, but grow rapidly
!      * until large.  (Maintenance note: any changes in fields, queues,
!      * or their uses, or JVM layout policies, must be accompanied by
!      * re-evaluation of these placement and sizing decisions.)
       *
       * Style notes
       * ===========
       *
       * Memory ordering relies mainly on atomic operations (CAS,

*** 1059,21 ***
       * running out of resources needed to do so.
       */
      static final int DEFAULT_COMMON_MAX_SPARES = 256;
  
      /**
!      * Initial capacity of work-stealing queue array for workers.
       * Must be a power of two, at least 2. See above.
       */
      static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
  
-     /**
-      * Initial capacity of work-stealing queue array for external queues.
-      * Must be a power of two, at least 2. See above.
-      */
-     static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
- 
      // conversions among short, int, long
      static final int  SMASK           = 0xffff;      // (unsigned) short bits
      static final long LMASK           = 0xffffffffL; // lower 32 bits of long
      static final long UMASK           = ~LMASK;      // upper 32 bits
  
--- 1040,15 ---
       * running out of resources needed to do so.
       */
      static final int DEFAULT_COMMON_MAX_SPARES = 256;
  
      /**
!      * Initial capacity of work-stealing queue array.
       * Must be a power of two, at least 2. See above.
       */
      static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
  
      // conversions among short, int, long
      static final int  SMASK           = 0xffff;      // (unsigned) short bits
      static final long LMASK           = 0xffffffffL; // lower 32 bits of long
      static final long UMASK           = ~LMASK;      // upper 32 bits
  

*** 1201,23 ***
          final ForkJoinWorkerThread owner; // null if shared
          ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
          int base;                  // index of next slot for poll
          final int config;          // mode bits
  
!         // fields otherwise causing more unnecessary false-sharing cache misses
-         @jdk.internal.vm.annotation.Contended("w")
          int top;                   // index of next slot for push
          @jdk.internal.vm.annotation.Contended("w")
          volatile int phase;        // versioned active status
          @jdk.internal.vm.annotation.Contended("w")
          int stackPred;             // pool stack (ctl) predecessor link
          @jdk.internal.vm.annotation.Contended("w")
          volatile int source;       // source queue id (or DROPPED)
          @jdk.internal.vm.annotation.Contended("w")
          int nsteals;               // number of steals from other queues
-         @jdk.internal.vm.annotation.Contended("w")
-         volatile int parking;      // nonzero if parked in awaitWork
  
          // Support for atomic operations
          private static final Unsafe U;
          private static final long PHASE;
          private static final long BASE;
--- 1176,24 ---
          final ForkJoinWorkerThread owner; // null if shared
          ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
          int base;                  // index of next slot for poll
          final int config;          // mode bits
  
!         @jdk.internal.vm.annotation.Contended("t") // segregate
          int top;                   // index of next slot for push
+ 
+         // fields otherwise causing more unnecessary false-sharing cache misses
          @jdk.internal.vm.annotation.Contended("w")
          volatile int phase;        // versioned active status
          @jdk.internal.vm.annotation.Contended("w")
          int stackPred;             // pool stack (ctl) predecessor link
          @jdk.internal.vm.annotation.Contended("w")
+         volatile int parking;      // nonzero if parked in awaitWork
+         @jdk.internal.vm.annotation.Contended("w")
          volatile int source;       // source queue id (or DROPPED)
          @jdk.internal.vm.annotation.Contended("w")
          int nsteals;               // number of steals from other queues
  
          // Support for atomic operations
          private static final Unsafe U;
          private static final long PHASE;
          private static final long BASE;

*** 1246,15 ***
           * Constructor. For internal queues, most fields are initialized
           * upon thread start in pool.registerWorker.
           */
          WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
                    boolean clearThreadLocals) {
-             array = new ForkJoinTask<?>[owner == null ?
-                                         INITIAL_EXTERNAL_QUEUE_CAPACITY :
-                                         INITIAL_QUEUE_CAPACITY];
-             this.owner = owner;
              this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
          }
  
          /**
           * Returns an exportable index (used by ForkJoinWorkerThread).
           */
--- 1222,15 ---
           * Constructor. For internal queues, most fields are initialized
           * upon thread start in pool.registerWorker.
           */
          WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
                    boolean clearThreadLocals) {
              this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
+             if ((this.owner = owner) == null) {
+                 array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
+                 phase = id | IDLE;
+             }
          }
  
          /**
           * Returns an exportable index (used by ForkJoinWorkerThread).
           */

*** 1278,43 ***
           * @param internal if caller owns this queue
           * @throws RejectedExecutionException if array could not be resized
           */
          final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
              int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
!             if ((a = array) != null && (cap = a.length) > 0 && // else disabled
-                 task != null) {
-                 int pk = task.noUserHelp() + 1;             // prev slot offset
                  if ((room = (m = cap - 1) - (s - b)) >= 0) {
                      top = s + 1;
                      long pos = slotOffset(m & s);
                      if (!internal)
                          U.putReference(a, pos, task);       // inside lock
                      else
                          U.getAndSetReference(a, pos, task); // fully fenced
!                     if (room == 0)                          // resize
!                         growArray(a, cap, s);
                  }
                  if (!internal)
                      unlockPhase();
                  if (room < 0)
                      throw new RejectedExecutionException("Queue capacity exceeded");
!                 if ((room == 0 || a[m & (s - pk)] == null) &&
!                     pool != null)
!                     pool.signalWork();   // may have appeared empty
              }
          }
  
          /**
           * Resizes the queue array unless out of memory.
           * @param a old array
           * @param cap old array capacity
           * @param s current top
           */
!         private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
!             int newCap = cap << 1;
              if (a != null && a.length == cap && cap > 0 && newCap > 0) {
-                 ForkJoinTask<?>[] newArray = null;
                  try {
                      newArray = new ForkJoinTask<?>[newCap];
                  } catch (OutOfMemoryError ex) {
                  }
                  if (newArray != null) {               // else throw on next push
--- 1254,42 ---
           * @param internal if caller owns this queue
           * @throws RejectedExecutionException if array could not be resized
           */
          final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
              int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
!             if ((a = array) != null && (cap = a.length) > 0) { // else disabled
                  if ((room = (m = cap - 1) - (s - b)) >= 0) {
                      top = s + 1;
                      long pos = slotOffset(m & s);
                      if (!internal)
                          U.putReference(a, pos, task);       // inside lock
                      else
                          U.getAndSetReference(a, pos, task); // fully fenced
!                     if (room == 0 && (a = growArray(a, cap, s)) != null)
!                         m = a.length - 1;                   // resize
                  }
                  if (!internal)
                      unlockPhase();
                  if (room < 0)
                      throw new RejectedExecutionException("Queue capacity exceeded");
!                 if (pool != null && a != null &&
!                     U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null)
!                     pool.signalWork(a, m & s);   // may have appeared empty
              }
          }
  
          /**
           * Resizes the queue array unless out of memory.
           * @param a old array
           * @param cap old array capacity
           * @param s current top
+          * @return new array, or null on failure
           */
!         private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
!             int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
+             ForkJoinTask<?>[] newArray = null;
              if (a != null && a.length == cap && cap > 0 && newCap > 0) {
                  try {
                      newArray = new ForkJoinTask<?>[newCap];
                  } catch (OutOfMemoryError ex) {
                  }
                  if (newArray != null) {               // else throw on next push

*** 1327,49 ***
                          newArray[k & newMask] = u;
                      }
                      updateArray(newArray);           // fully fenced
                  }
              }
          }
  
          /**
!          * Takes next task, if one exists, in order specified by mode,
-          * so acts as either local-pop or local-poll. Called only by owner.
-          * @param fifo nonzero if FIFO mode
           */
!         private ForkJoinTask<?> nextLocalTask(int fifo) {
              ForkJoinTask<?> t = null;
!             ForkJoinTask<?>[] a = array;
!             int b = base, p = top, cap;
!             if (p - b > 0 && a != null && (cap = a.length) > 0) {
!                 for (int m = cap - 1, s, nb;;) {
!                     if (fifo == 0 || (nb = b + 1) == p) {
!                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
!                                  a, slotOffset(m & (s = p - 1)), null)) != null)
!                             updateTop(s);       // else lost race for only task
!                         break;
                      }
!                     if ((t = (ForkJoinTask<?>)U.getAndSetReference(
!                              a, slotOffset(m & b), null)) != null) {
                          updateBase(nb);
                          break;
                      }
!                     while (b == (b = U.getIntAcquire(this, BASE)))
!                         Thread.onSpinWait();    // spin to reduce memory traffic
-                     if (p - b <= 0)
-                         break;
                  }
              }
              return t;
          }
  
          /**
           * Takes next task, if one exists, using configured mode.
-          * (Always internal, never called for Common pool.)
           */
          final ForkJoinTask<?> nextLocalTask() {
!             return nextLocalTask(config & FIFO);
          }
  
          /**
           * Pops the given task only if it is at the current top.
           * @param task the task. Caller must ensure non-null.
--- 1302,59 ---
                          newArray[k & newMask] = u;
                      }
                      updateArray(newArray);           // fully fenced
                  }
              }
+             return newArray;
          }
  
          /**
!          * Takes next task, if one exists, in lifo order.
           */
!         private ForkJoinTask<?> localPop() {
              ForkJoinTask<?> t = null;
!             int s = top - 1, cap; long k; ForkJoinTask<?>[] a;
!             if ((a = array) != null && (cap = a.length) > 0 &&
!                 U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
!                 (t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
!                 updateTop(s);
!             return t;
!         }
! 
!         /**
+          * Takes next task, if one exists, in fifo order.
+          */
+         private ForkJoinTask<?> localPoll() {
+             ForkJoinTask<?> t = null;
+             int p = top, cap; ForkJoinTask<?>[] a;
+             if ((a = array) != null && (cap = a.length) > 0) {
+                 for (int b = base; p - b > 0; ) {
+                     int nb = b + 1;
+                     long k = slotOffset((cap - 1) & b);
+                     if (U.getReference(a, k) == null) {
+                         if (nb == p)
+                             break;          // else base is lagging
+                         while (b == (b = U.getIntAcquire(this, BASE)))
+                             Thread.onSpinWait(); // spin to reduce memory traffic
                      }
!                     else if ((t = (ForkJoinTask<?>)
!                               U.getAndSetReference(a, k, null)) != null) {
                          updateBase(nb);
                          break;
                      }
!                     else
!                         b = base;
                  }
              }
              return t;
          }
  
          /**
           * Takes next task, if one exists, using configured mode.
           */
          final ForkJoinTask<?> nextLocalTask() {
!             return (config & FIFO) == 0 ? localPop() : localPoll();
          }
  
          /**
           * Pops the given task only if it is at the current top.
           * @param task the task. Caller must ensure non-null.

*** 1446,11 ***
           * Runs the given task, as well as remaining local tasks.
           */
          final void topLevelExec(ForkJoinTask<?> task, int fifo) {
              while (task != null) {
                  task.doExec();
!                 task = nextLocalTask(fifo);
              }
          }
  
          /**
           * Deep form of tryUnpush: Traverses from top and removes and
--- 1431,11 ---
           * Runs the given task, as well as remaining local tasks.
           */
          final void topLevelExec(ForkJoinTask<?> task, int fifo) {
              while (task != null) {
                  task.doExec();
!                 task = (fifo == 0) ? localPop() : localPoll();
              }
          }
  
          /**
           * Deep form of tryUnpush: Traverses from top and removes and

*** 1576,11 ***
  
          /**
           * Cancels all local tasks. Called only by owner.
           */
          final void cancelTasks() {
!             for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) {
                  try {
                      t.cancel(false);
                  } catch (Throwable ignore) {
                  }
              }
--- 1561,11 ---
  
          /**
           * Cancels all local tasks. Called only by owner.
           */
          final void cancelTasks() {
!             for (ForkJoinTask<?> t; (t = localPop()) != null; ) {
                  try {
                      t.cancel(false);
                  } catch (Throwable ignore) {
                  }
              }

*** 1778,11 ***
       * Finishes initializing and records internal queue.
       *
       * @param w caller's WorkQueue
       */
      final void registerWorker(WorkQueue w) {
!         if (w != null && (runState & STOP) == 0L) {
              ThreadLocalRandom.localInit();
              int seed = w.stackPred = ThreadLocalRandom.getProbe();
              int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
              int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
              long stop = lockRunState() & STOP;
--- 1763,12 ---
       * Finishes initializing and records internal queue.
       *
       * @param w caller's WorkQueue
       */
      final void registerWorker(WorkQueue w) {
!         if (w != null) {
+             w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
              ThreadLocalRandom.localInit();
              int seed = w.stackPred = ThreadLocalRandom.getProbe();
              int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
              int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
              long stop = lockRunState() & STOP;

*** 1856,21 ***
                  unlockRunState();
              }
          }
          if ((tryTerminate(false, false) & STOP) == 0L &&
              phase != 0 && w != null && w.source != DROPPED) {
-             signalWork();                  // possibly replace
              w.cancelTasks();               // clean queue
          }
          if (ex != null)
              ForkJoinTask.rethrow(ex);
      }
  
      /**
!      * Releases an idle worker, or creates one if not enough exist.
       */
!     final void signalWork() {
          int pc = parallelism;
          for (long c = ctl;;) {
              WorkQueue[] qs = queues;
              long ac = (c + RC_UNIT) & RC_MASK, nc;
              int sp = (int)c, i = sp & SMASK;
--- 1842,22 ---
                  unlockRunState();
              }
          }
          if ((tryTerminate(false, false) & STOP) == 0L &&
              phase != 0 && w != null && w.source != DROPPED) {
              w.cancelTasks();               // clean queue
+             signalWork(null, 0);           // possibly replace
          }
          if (ex != null)
              ForkJoinTask.rethrow(ex);
      }
  
      /**
!      * Releases an idle worker, or creates one if not enough exist,
+      * giving up if array a is nonnull and task at a[k] already taken.
       */
!     final void signalWork(ForkJoinTask<?>[] a, int k) {
          int pc = parallelism;
          for (long c = ctl;;) {
              WorkQueue[] qs = queues;
              long ac = (c + RC_UNIT) & RC_MASK, nc;
              int sp = (int)c, i = sp & SMASK;

*** 1882,17 ***
                  break;
              WorkQueue w = qs[i], v = null;
              if (sp == 0) {
                  if ((short)(c >>> TC_SHIFT) >= pc)
                      break;
!                 nc = ((c + TC_UNIT) & TC_MASK);
              }
              else if ((v = w) == null)
                  break;
              else
!                 nc = (v.stackPred & LMASK) | (c & TC_MASK);
!             if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
                  if (v == null)
                      createWorker();
                  else {
                      v.phase = sp;
                      if (v.parking != 0)
--- 1869,19 ---
                  break;
              WorkQueue w = qs[i], v = null;
              if (sp == 0) {
                  if ((short)(c >>> TC_SHIFT) >= pc)
                      break;
!                 nc = ((c + TC_UNIT) & TC_MASK) | ac;
              }
              else if ((v = w) == null)
                  break;
              else
!                 nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
!             if (a != null && k < a.length && k >= 0 && a[k] == null)
+                 break;
+             if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
                  if (v == null)
                      createWorker();
                  else {
                      v.phase = sp;
                      if (v.parking != 0)

*** 1972,161 ***
       *
       * @param w caller's WorkQueue (may be null on failed initialization)
       */
      final void runWorker(WorkQueue w) {
          if (w != null) {
!             int phase = w.phase, r = w.stackPred;     // seed from registerWorker
!             int fifo = w.config & FIFO, nsteals = 0, src = -1;
!             for (;;) {
!                 WorkQueue[] qs;
                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
!                 if ((runState & STOP) != 0L || (qs = queues) == null)
!                     break;
!                 int n = qs.length, i = r, step = (r >>> 16) | 1;
!                 boolean rescan = false;
!                 scan: for (int l = n; l > 0; --l, i += step) {  // scan queues
!                     int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
!                     if ((q = qs[j = i & (n - 1)]) != null &&
!                         (a = q.array) != null && (cap = a.length) > 0) {
!                         for (int m = cap - 1, pb = -1, b = q.base;;) {
!                             ForkJoinTask<?> t; long k;
!                             t = (ForkJoinTask<?>)U.getReferenceAcquire(
!                                 a, k = slotOffset(m & b));
!                             if (b != (b = q.base) || t == null ||
!                                 !U.compareAndSetReference(a, k, t, null)) {
!                                 if (a[b & m] == null) {
!                                     if (rescan)           // end of run
!                                         break scan;
!                                     if (a[(b + 1) & m] == null &&
!                                         a[(b + 2) & m] == null) {
!                                         break;            // probably empty
!                                     }
!                                     if (pb == (pb = b)) { // track progress
-                                         rescan = true;    // stalled; reorder scan
-                                         break scan;
-                                     }
                                  }
                              }
!                             else {
!                                 boolean propagate;
!                                 int nb = q.base = b + 1, prevSrc = src;
!                                 w.nsteals = ++nsteals;
!                                 w.source = src = j;       // volatile
!                                 rescan = true;
!                                 int nh = t.noUserHelp();
!                                 if (propagate =
!                                     (prevSrc != src || nh != 0) && a[nb & m] != null)
!                                     signalWork();
!                                 w.topLevelExec(t, fifo);
!                                 if ((b = q.base) != nb && !propagate)
-                                     break scan;          // reduce interference
                              }
                          }
                      }
                  }
-                 if (!rescan) {
-                     if (((phase = deactivate(w, phase)) & IDLE) != 0)
-                         break;
-                     src = -1;                            // re-enable propagation
-                 }
              }
          }
      }
  
      /**
!      * Deactivates and if necessary awaits signal or termination.
       *
!      * @param w the worker
!      * @param phase current phase
-      * @return current phase, with IDLE set if worker should exit
       */
!     private int deactivate(WorkQueue w, int phase) {
!         if (w == null)                        // currently impossible
!             return IDLE;
!         int p = phase | IDLE, activePhase = phase + (IDLE << 1);
!         long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
!         int sp = w.stackPred = (int)pc;       // set ctl stack link
!         w.phase = p;
!         if (!compareAndSetCtl(pc, qc))        // try to enqueue
!             return w.phase = phase;           // back out on possible signal
!         int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
!         if (((e = runState) & STOP) != 0L ||
!             ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
!             (qs = queues) == null || (n = qs.length) <= 0)
!             return IDLE;                      // terminating
! 
!         for (int prechecks = Math.min(ac, 2), // reactivation threshold
!              k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
!             WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
!             if (w.phase == activePhase)
!                 return activePhase;
!             if (--k < 0)
!                 return awaitWork(w, p);       // block, drop, or exit
!             if ((q = qs[k & (n - 1)]) == null)
!                 Thread.onSpinWait();
!             else if ((a = q.array) != null && (cap = a.length) > 0 &&
!                      a[q.base & (cap - 1)] != null && --prechecks < 0 &&
!                      (int)(c = ctl) == activePhase &&
!                      compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
!                 return w.phase = activePhase; // reactivate
          }
      }
  
      /**
       * Awaits signal or termination.
       *
       * @param w the work queue
!      * @param p current phase (known to be idle)
!      * @return current phase, with IDLE set if worker should exit
       */
!     private int awaitWork(WorkQueue w, int p) {
!         if (w != null) {
!             ForkJoinWorkerThread t; long deadline;
!             if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
!                 t.resetThreadLocals();          // clear before reactivate
!             if ((ctl & RC_MASK) > 0L)
                  deadline = 0L;
!             else if ((deadline =
!                       (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
!                       System.currentTimeMillis()) == 0L)
!                 deadline = 1L;                 // avoid zero
!             int activePhase = p + IDLE;
!             if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
!                 LockSupport.setCurrentBlocker(this);
!                 w.parking = 1;                 // enable unpark
!                 while ((p = w.phase) != activePhase) {
!                     boolean trimmable = false; int trim;
!                     Thread.interrupted();      // clear status
!                     if ((runState & STOP) != 0L)
                          break;
!                     if (deadline != 0L) {
!                         if ((trim = tryTrim(w, p, deadline)) > 0)
-                             break;
-                         else if (trim < 0)
-                             deadline = 0L;
-                         else
-                             trimmable = true;
-                     }
-                     U.park(trimmable, deadline);
                  }
!                 w.parking = 0;
!                 LockSupport.setCurrentBlocker(null);
              }
          }
!         return p;
      }
  
      /**
       * Tries to remove and deregister worker after timeout, and release
       * another to do the same.
       * @return > 0: trimmed, < 0 : not trimmable, else 0
       */
!     private int tryTrim(WorkQueue w, int phase, long deadline) {
!         long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
!         if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
              stat = -1;                      // no longer ctl top
!         else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
              stat = 0;                       // spurious wakeup
          else if (!compareAndSetCtl(
                       c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
                                 (TC_MASK & (c - TC_UNIT)))))
              stat = -1;                      // lost race to signaller
--- 1961,168 ---
       *
       * @param w caller's WorkQueue (may be null on failed initialization)
       */
      final void runWorker(WorkQueue w) {
          if (w != null) {
!             int r = w.stackPred;                          // seed from registerWorker
!             int fifo = (int)config & FIFO;
!             int nsteals = 0;                              // shadow w.nsteals
!             boolean rescan = true;
+             WorkQueue[] qs; int n;
+             while ((rescan || deactivate(w) == 0) && (runState & STOP) == 0L &&
+                    (qs = queues) != null && (n = qs.length) > 0) {
+                 rescan = false;
+                 int i = r, step = (r >>> 16) | 1;
                  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
!                 scan: for (int j = n << 1; j != 0; --j, i += step) { // 2 sweeps
!                     WorkQueue q; int qid;
!                     if ((q = qs[qid = i & (n - 1)]) != null) {
!                         for (;;) {                        // poll queue q
!                             ForkJoinTask<?>[] a; int cap, b, m, nb, nk;
!                             if ((a = q.array) == null || (cap = a.length) <= 0)
!                                 break;
!                             long bp = slotOffset((m = cap - 1) & (b = q.base));
!                             long np = slotOffset(nk = m & (nb = b + 1));
!                             ForkJoinTask<?> t = (ForkJoinTask<?>)
!                                 U.getReferenceAcquire(a, bp);
!                             if (q.array != a || q.base != b ||
!                                 U.getReference(a, bp) != t)
!                                 continue;                 // inconsistent
!                             if (t == null) {
!                                 if (rescan) {             // end of run
!                                     w.nsteals = nsteals;
!                                     break scan;
!                                 }
!                                 if (U.getReference(a, np) != null) {
!                                     rescan = true;        // stalled; reorder scan
!                                     break scan;
                                  }
+                                 break;                    // probably empty
                              }
!                             if (U.compareAndSetReference(a, bp, t, null)) {
!                                 q.base = nb;
!                                 Object nt = U.getReferenceAcquire(a, np);
!                                 if (!rescan) {            // begin run
!                                     rescan = true;
!                                     w.source = qid;
!                                 }
!                                 ++nsteals;
!                                 if (nt != null &&         // confirm a[nk]
!                                     U.getReference(a, np) == nt)
!                                     signalWork(a, nk);    // propagate
!                                 w.topLevelExec(t, fifo);  // run t & its subtasks
                              }
                          }
                      }
                  }
              }
          }
      }
  
      /**
!      * Deactivates and awaits signal or termination.
       *
!      * @param w the work queue
!      * @return zero if now active
       */
!     private int deactivate(WorkQueue w) {
!         int idle = 1;
!         if (w != null) {                        // always true; hoist checks
!             int inactive = w.phase |= IDLE;     // set status
!             int activePhase = inactive + IDLE;  // phase value when reactivated
!             long ap = activePhase & LMASK, pc = ctl, qc;
!             do {                                // enqueue
!                 qc = ap | ((pc - RC_UNIT) & UMASK);
!                 w.stackPred = (int)pc;          // set ctl stack link
!             } while (pc != (pc = compareAndExchangeCtl(pc, qc)));
! 
!             WorkQueue[] qs; int n; long e;
!             if (((e = runState) & STOP) == 0 && // quiescence checks
!                 ((e & SHUTDOWN) == 0L || (qc & RC_MASK) > 0L || quiescent() <= 0) &&
!                 (qs = queues) != null && (n = qs.length) > 1) {
!                 long psp = pc & LMASK;          // ctl predecessor prefix
!                 for (int i = 1; i < n; ++i) {   // scan; stagger origins
!                     WorkQueue q; long c;        // missed signal check
!                     if ((q = qs[(activePhase + i) & (n - 1)]) != null &&
!                         q.top - q.base > 0) {
!                         if ((idle = w.phase - activePhase) != 0 &&
!                             (int)(c = ctl) == activePhase &&
!                             compareAndSetCtl(c, psp | ((c + RC_UNIT) & UMASK))) {
!                             w.phase = activePhase;
!                             idle = 0;           // reactivated
!                         }                       // else ineligible or lost race
!                         break;
!                     }
!                 }
+                 if (idle != 0 && (idle = w.phase - activePhase) != 0)
+                     idle = awaitWork(w, activePhase, n);
+             }
          }
+         return idle;
      }
  
      /**
       * Awaits signal or termination.
       *
       * @param w the work queue
!      * @param activePhase w's next active phase
!      * @param qsize current size of queues array
+      * @return zero if now active
       */
!     private int awaitWork(WorkQueue w, int activePhase, int qsize) {
!         int idle = 1;
!         int spins = qsize | (qsize - 1);      // approx traversal cost
!         if (w != null) {                      // always true; hoist checks
!             boolean trimmable; long deadline, c;
!             long trimTime = (w.source == INVALID_ID) ? TIMEOUT_SLOP : keepAlive;
+             if ((w.config & CLEAR_TLS) != 0 && // instanceof check always true
+                 Thread.currentThread() instanceof ForkJoinWorkerThread f)
+                 f.resetThreadLocals();        // clear while accessing thread state
+             LockSupport.setCurrentBlocker(this);
+             if (trimmable = (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase))
+                 deadline = trimTime + System.currentTimeMillis();
+             else
                  deadline = 0L;
!             for (;;) {
!                 int s = spins, trim;
!                 Thread.interrupted();         // clear status
!                 if ((runState & STOP) != 0L)
!                     break;
!                 while ((idle = w.phase - activePhase) != 0 && --s != 0)
!                     Thread.onSpinWait();      // spin before blocking
!                 if (idle == 0)
!                     break;
!                 if (trimmable &&
!                     (trim = tryTrim(w, activePhase, deadline)) != 0) {
!                     if (trim > 0)
                          break;
!                     trimmable = false;
!                     deadline = 0L;
                  }
!                 w.parking = 1;                // enable unpark and recheck
!                 if ((idle = w.phase - activePhase) != 0)
+                     U.park(trimmable, deadline);
+                 w.parking = 0;                // close unpark window
+                 if (idle == 0 || (idle = w.phase - activePhase) == 0)
+                     break;
              }
+             LockSupport.setCurrentBlocker(null);
          }
!         return idle;
      }
  
      /**
       * Tries to remove and deregister worker after timeout, and release
       * another to do the same.
       * @return > 0: trimmed, < 0 : not trimmable, else 0
       */
!     private int tryTrim(WorkQueue w, int activePhase, long deadline) {
!         long c, nc; int stat, vp, i; WorkQueue[] vs; WorkQueue v;
!         long waitTime = deadline - System.currentTimeMillis();
+         if ((int)(c = ctl) != activePhase || w == null)
              stat = -1;                      // no longer ctl top
!         else if (waitTime > TIMEOUT_SLOP)
              stat = 0;                       // spurious wakeup
          else if (!compareAndSetCtl(
                       c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
                                 (TC_MASK & (c - TC_UNIT)))))
              stat = -1;                      // lost race to signaller

*** 2559,56 ***
  
      // External operations
  
      /**
       * Finds and locks a WorkQueue for an external submitter, or
!      * throws RejectedExecutionException if shutdown or terminating.
-      * @param r current ThreadLocalRandom.getProbe() value
       * @param rejectOnShutdown true if RejectedExecutionException
!      *        should be thrown when shutdown (else only if terminating)
       */
!     private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
!         int reuse;                                   // nonzero if prefer create
!         if ((reuse = r) == 0) {
!             ThreadLocalRandom.localInit();           // initialize caller's probe
              r = ThreadLocalRandom.getProbe();
          }
!         for (int probes = 0; ; ++probes) {
!             int n, i, id; WorkQueue[] qs; WorkQueue q;
!             if ((qs = queues) == null)
-                 break;
-             if ((n = qs.length) <= 0)
                  break;
              if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
!                 WorkQueue w = new WorkQueue(null, id, 0, false);
!                 w.phase = id;
!                 boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
!                                   rejectOnShutdown);
-                 if (!reject && queues == qs && qs[i] == null)
-                     q = qs[i] = w;                   // else lost race to install
                  unlockRunState();
-                 if (q != null)
-                     return q;
-                 if (reject)
-                     break;
-                 reuse = 0;
              }
!             if (reuse == 0 || !q.tryLockPhase()) {   // move index
!                 if (reuse == 0) {
!                     if (probes >= n >> 1)
!                         reuse = r;                   // stop prefering free slot
                  }
-                 else if (q != null)
-                     reuse = 0;                       // probe on collision
-                 r = ThreadLocalRandom.advanceProbe(r);
-             }
-             else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
-                 q.unlockPhase();                     // check while q lock held
-                 break;
-             }
-             else
                  return q;
          }
          throw new RejectedExecutionException();
      }
  
      private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
--- 2555,39 ---
  
      // External operations
  
      /**
       * Finds and locks a WorkQueue for an external submitter, or
!      * throws RejectedExecutionException if shutdown
       * @param rejectOnShutdown true if RejectedExecutionException
!      *        should be thrown when shutdown
       */
!     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
!         int r;
!         if ((r = ThreadLocalRandom.getProbe()) == 0) {
!             ThreadLocalRandom.localInit();   // initialize caller's probe
              r = ThreadLocalRandom.getProbe();
          }
!         for (;;) {
!             WorkQueue q; WorkQueue[] qs; int n, id, i;
!             if ((qs = queues) == null || (n = qs.length) <= 0)
                  break;
              if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
!                 WorkQueue newq = new WorkQueue(null, id, 0, false);
!                 lockRunState();
!                 if (qs[i] == null && queues == qs)
!                     q = qs[i] = newq;         // else lost race to install
                  unlockRunState();
              }
!             if (q != null && q.tryLockPhase()) {
!                 if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
!                     q.unlockPhase();          // check while q lock held
!                     break;
                  }
                  return q;
+             }
+             r = ThreadLocalRandom.advanceProbe(r); // move
          }
          throw new RejectedExecutionException();
      }
  
      private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {

*** 2618,28 ***
              internal = true;
              q = wt.workQueue;
          }
          else {                     // find and lock queue
              internal = false;
!             q = submissionQueue(ThreadLocalRandom.getProbe(), true);
          }
          q.push(task, signalIfEmpty ? this : null, internal);
          return task;
      }
  
-     /**
-      * Returns queue for an external submission, bypassing call to
-      * submissionQueue if already established and unlocked.
-      */
-     final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
-         WorkQueue[] qs; WorkQueue q; int n;
-         int r = ThreadLocalRandom.getProbe();
-         return (((qs = queues) != null && (n = qs.length) > 0 &&
-                  (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
-                  q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
-     }
- 
      /**
       * Returns queue for an external thread, if one exists that has
       * possibly ever submitted to the given pool (nonzero probe), or
       * null if none.
       */
--- 2597,16 ---
              internal = true;
              q = wt.workQueue;
          }
          else {                     // find and lock queue
              internal = false;
!             q = externalSubmissionQueue(true);
          }
          q.push(task, signalIfEmpty ? this : null, internal);
          return task;
      }
  
      /**
       * Returns queue for an external thread, if one exists that has
       * possibly ever submitted to the given pool (nonzero probe), or
       * null if none.
       */
< prev index next >