1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.reflect.Field;
  40 import java.security.AccessController;
  41 import java.security.AccessControlContext;
  42 import java.security.Permission;
  43 import java.security.Permissions;
  44 import java.security.PrivilegedAction;
  45 import java.security.ProtectionDomain;
  46 import java.util.ArrayList;
  47 import java.util.Collection;
  48 import java.util.Collections;
  49 import java.util.List;
  50 import java.util.Objects;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.CountDownLatch;
  53 import java.util.concurrent.locks.LockSupport;
  54 import jdk.internal.access.JavaUtilConcurrentFJPAccess;
  55 import jdk.internal.access.SharedSecrets;
  56 import jdk.internal.misc.Unsafe;
  57 import jdk.internal.vm.SharedThreadContainer;
  58 import jdk.internal.vm.annotation.DontInline;
  59 
  60 /**
  61  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  62  * A {@code ForkJoinPool} provides the entry point for submissions
  63  * from non-{@code ForkJoinTask} clients, as well as management and
  64  * monitoring operations.
  65  *
  66  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  67  * ExecutorService} mainly by virtue of employing
  68  * <em>work-stealing</em>: all threads in the pool attempt to find and
  69  * execute tasks submitted to the pool and/or created by other active
  70  * tasks (eventually blocking waiting for work if none exist). This
  71  * enables efficient processing when most tasks spawn other subtasks
  72  * (as do most {@code ForkJoinTask}s), as well as when many small
  73  * tasks are submitted to the pool from external clients.  Especially
  74  * when setting <em>asyncMode</em> to true in constructors, {@code
  75  * ForkJoinPool}s may also be appropriate for use with event-style
  76  * tasks that are never joined. All worker threads are initialized
  77  * with {@link Thread#isDaemon} set {@code true}.
  78  *
  79  * <p>A static {@link #commonPool()} is available and appropriate for
  80  * most applications. The common pool is used by any ForkJoinTask that
  81  * is not explicitly submitted to a specified pool. Using the common
  82  * pool normally reduces resource usage (its threads are slowly
  83  * reclaimed during periods of non-use, and reinstated upon subsequent
  84  * use).
  85  *
  86  * <p>For applications that require separate or custom pools, a {@code
  87  * ForkJoinPool} may be constructed with a given target parallelism
  88  * level; by default, equal to the number of available processors.
  89  * The pool attempts to maintain enough active (or available) threads
  90  * by dynamically adding, suspending, or resuming internal worker
  91  * threads, even if some tasks are stalled waiting to join others.
  92  * However, no such adjustments are guaranteed in the face of blocked
  93  * I/O or other unmanaged synchronization. The nested {@link
  94  * ManagedBlocker} interface enables extension of the kinds of
  95  * synchronization accommodated. The default policies may be
  96  * overridden using a constructor with parameters corresponding to
  97  * those documented in class {@link ThreadPoolExecutor}.
  98  *
  99  * <p>In addition to execution and lifecycle control methods, this
 100  * class provides status check methods (for example
 101  * {@link #getStealCount}) that are intended to aid in developing,
 102  * tuning, and monitoring fork/join applications. Also, method
 103  * {@link #toString} returns indications of pool state in a
 104  * convenient form for informal monitoring.
 105  *
 106  * <p>As is the case with other ExecutorServices, there are three
 107  * main task execution methods summarized in the following table.
 108  * These are designed to be used primarily by clients not already
 109  * engaged in fork/join computations in the current pool.  The main
 110  * forms of these methods accept instances of {@code ForkJoinTask},
 111  * but overloaded forms also allow mixed execution of plain {@code
 112  * Runnable}- or {@code Callable}- based activities as well.  However,
 113  * tasks that are already executing in a pool should normally instead
 114  * use the within-computation forms listed in the table unless using
 115  * async event-style tasks that are not usually joined, in which case
 116  * there is little difference among choice of methods.
 117  *
 118  * <table class="plain">
 119  * <caption>Summary of task execution methods</caption>
 120  *  <tr>
 121  *    <td></td>
 122  *    <th scope="col"> Call from non-fork/join clients</th>
 123  *    <th scope="col"> Call from within fork/join computations</th>
 124  *  </tr>
 125  *  <tr>
 126  *    <th scope="row" style="text-align:left"> Arrange async execution</th>
 127  *    <td> {@link #execute(ForkJoinTask)}</td>
 128  *    <td> {@link ForkJoinTask#fork}</td>
 129  *  </tr>
 130  *  <tr>
 131  *    <th scope="row" style="text-align:left"> Await and obtain result</th>
 132  *    <td> {@link #invoke(ForkJoinTask)}</td>
 133  *    <td> {@link ForkJoinTask#invoke}</td>
 134  *  </tr>
 135  *  <tr>
 136  *    <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
 137  *    <td> {@link #submit(ForkJoinTask)}</td>
 138  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
 139  *  </tr>
 140  * </table>
 141  *
 142  * <p>The parameters used to construct the common pool may be controlled by
 143  * setting the following {@linkplain System#getProperty system properties}:
 144  * <ul>
 145  * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}
 146  * - the parallelism level, a non-negative integer
 147  * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory}
 148  * - the class name of a {@link ForkJoinWorkerThreadFactory}.
 149  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
 150  * is used to load this class.
 151  * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.exceptionHandler}
 152  * - the class name of a {@link UncaughtExceptionHandler}.
 153  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
 154  * is used to load this class.
 155  * <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.maximumSpares}
 156  * - the maximum number of allowed extra threads to maintain target
 157  * parallelism (default 256).
 158  * </ul>
 159  * If no thread factory is supplied via a system property, then the
 160  * common pool uses a factory that uses the system class loader as the
 161  * {@linkplain Thread#getContextClassLoader() thread context class loader}.
 162  * In addition, if a {@link SecurityManager} is present, then
 163  * the common pool uses a factory supplying threads that have no
 164  * {@link Permissions} enabled, and are not guaranteed to preserve
 165  * the values of {@link java.lang.ThreadLocal} variables across tasks.
 166  *
 167  * Upon any error in establishing these settings, default parameters
 168  * are used. It is possible to disable or limit the use of threads in
 169  * the common pool by setting the parallelism property to zero, and/or
 170  * using a factory that may return {@code null}. However doing so may
 171  * cause unjoined tasks to never be executed.
 172  *
 173  * @implNote This implementation restricts the maximum number of
 174  * running threads to 32767. Attempts to create pools with greater
 175  * than the maximum number result in {@code
 176  * IllegalArgumentException}. Also, this implementation rejects
 177  * submitted tasks (that is, by throwing {@link
 178  * RejectedExecutionException}) only when the pool is shut down or
 179  * internal resources have been exhausted.
 180  *
 181  * @since 1.7
 182  * @author Doug Lea
 183  */
 184 public class ForkJoinPool extends AbstractExecutorService {
 185 
 186     /*
 187      * Implementation Overview -- omitted until stable
 188      *
 189      */
 190 
 191     // static configuration constants
 192 
 193     /**
 194      * Default idle timeout value (in milliseconds) for idle threads
 195      * to park waiting for new work before terminating.
 196      */
 197     static final long DEFAULT_KEEPALIVE = 60_000L;
 198 
 199     /**
 200      * Undershoot tolerance for idle timeouts
 201      */
 202     static final long TIMEOUT_SLOP = 20L;
 203 
 204     /**
 205      * The default value for common pool maxSpares.  Overridable using
 206      * the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
 207      * system property.  The default value is far in excess of normal
 208      * requirements, but also far short of maximum capacity and typical OS
 209      * thread limits, so allows JVMs to catch misuse/abuse before
 210      * running out of resources needed to do so.
 211      */
 212     static final int DEFAULT_COMMON_MAX_SPARES = 256;
 213 
 214     /**
 215      * Initial capacity of work-stealing queue array.  Must be a power
 216      * of two, at least 2. See above.
 217      */
 218     static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
 219 
 220     // conversions among short, int, long
 221     static final int  SMASK           = 0xffff;      // (unsigned) short bits
 222     static final long LMASK           = 0xffffffffL; // lower 32 bits of long
 223     static final long UMASK           = ~LMASK;      // upper 32 bits
 224 
 225     // masks and sentinels for queue indices
 226     static final int MAX_CAP          = 0x7fff;   // max # workers
 227     static final int EXTERNAL_ID_MASK = 0x3ffe;   // max external queue id
 228     static final int INVALID_ID       = 0x4000;   // unused external queue id
 229 
 230     // pool.runState bits
 231     static final int STOP             = 1 <<  0;   // terminating
 232     static final int SHUTDOWN         = 1 <<  1;   // terminate when quiescent
 233     static final int TERMINATED       = 1 <<  2;   // only set if STOP also set
 234     static final int RS_LOCK          = 1 <<  3;   // lowest seqlock bit
 235 
 236     // spin/sleep limits for runState locking and elsewhere
 237     static final int SPIN_WAITS       = 1 <<  7;   // max calls to onSpinWait
 238     static final int MIN_SLEEP        = 1 << 10;   // approx 1 usec as nanos
 239     static final int MAX_SLEEP        = 1 << 20;   // approx 1 sec  as nanos
 240 
 241     // {pool, workQueue} config bits
 242     static final int FIFO             = 1 << 0;   // fifo queue or access mode
 243     static final int CLEAR_TLS        = 1 << 1;   // set for Innocuous workers
 244     static final int PRESET_SIZE      = 1 << 2;   // size was set by property
 245 
 246     // others
 247     static final int DEREGISTERED     = 1 << 31;  // worker terminating
 248     static final int UNCOMPENSATE     = 1 << 16;  // tryCompensate return
 249     static final int IDLE             = 1 << 16;  // phase seqlock/version count
 250 
 251     /*
 252      * Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
 253      * RC: Number of released (unqueued) workers
 254      * TC: Number of total workers
 255      * SS: version count and status of top waiting thread
 256      * ID: poolIndex of top of Treiber stack of waiters
 257      *
 258      * When convenient, we can extract the lower 32 stack top bits
 259      * (including version bits) as sp=(int)ctl. When sp is non-zero,
 260      * there are waiting workers.  Count fields may be transiently
 261      * negative during termination because of out-of-order updates.
 262      * To deal with this, we use casts in and out of "short" and/or
 263      * signed shifts to maintain signedness. Because it occupies
 264      * uppermost bits, we can add one release count using getAndAdd of
 265      * RC_UNIT, rather than CAS, when returning from a blocked join.
 266      * Other updates of multiple subfields require CAS.
 267      */
 268 
 269     // Release counts
 270     static final int  RC_SHIFT = 48;
 271     static final long RC_UNIT  = 0x0001L << RC_SHIFT;
 272     static final long RC_MASK  = 0xffffL << RC_SHIFT;
 273     // Total counts
 274     static final int  TC_SHIFT = 32;
 275     static final long TC_UNIT  = 0x0001L << TC_SHIFT;
 276     static final long TC_MASK  = 0xffffL << TC_SHIFT;
 277 
 278     /*
 279      * All atomic operations on task arrays (queues) use Unsafe
 280      * operations that take array offsets versus indices, based on
 281      * array base and shift constants established during static
 282      * initialization.
 283      */
 284     static final long ABASE;
 285     static final int  ASHIFT;
 286 
 287     // Static utilities
 288 
 289     /**
 290      * Returns the array offset corresponding to the given index for
 291      * Unsafe task queue operations
 292      */
 293     static long slotOffset(int index) {
 294         return ((long)index << ASHIFT) + ABASE;
 295     }
 296 
 297     /**
 298      * If there is a security manager, makes sure caller has
 299      * permission to modify threads.
 300      */
 301     @SuppressWarnings("removal")
 302     private static void checkPermission() {
 303         SecurityManager security; RuntimePermission perm;
 304         if ((security = System.getSecurityManager()) != null) {
 305             if ((perm = modifyThreadPermission) == null)
 306                 modifyThreadPermission = perm = // races OK
 307                     new RuntimePermission("modifyThread");
 308             security.checkPermission(perm);
 309         }
 310     }
 311 
 312     // Nested classes
 313 
 314     /**
 315      * Factory for creating new {@link ForkJoinWorkerThread}s.
 316      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
 317      * for {@code ForkJoinWorkerThread} subclasses that extend base
 318      * functionality or initialize threads with different contexts.
 319      */
 320     public static interface ForkJoinWorkerThreadFactory {
 321         /**
 322          * Returns a new worker thread operating in the given pool.
 323          * Returning null or throwing an exception may result in tasks
 324          * never being executed.  If this method throws an exception,
 325          * it is relayed to the caller of the method (for example
 326          * {@code execute}) causing attempted thread creation. If this
 327          * method returns null or throws an exception, it is not
 328          * retried until the next attempted creation (for example
 329          * another call to {@code execute}).
 330          *
 331          * @param pool the pool this thread works in
 332          * @return the new worker thread, or {@code null} if the request
 333          *         to create a thread is rejected
 334          * @throws NullPointerException if the pool is null
 335          */
 336         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
 337     }
 338 
 339     /**
 340      * Default ForkJoinWorkerThreadFactory implementation; creates a
 341      * new ForkJoinWorkerThread using the system class loader as the
 342      * thread context class loader.
 343      */
 344     static final class DefaultForkJoinWorkerThreadFactory
 345         implements ForkJoinWorkerThreadFactory {
 346         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
 347             boolean isCommon = (pool.workerNamePrefix == null);
 348             @SuppressWarnings("removal")
 349             SecurityManager sm = System.getSecurityManager();
 350             if (sm == null)
 351                 return new ForkJoinWorkerThread(null, pool, true, false);
 352             else if (isCommon)
 353                 return newCommonWithACC(pool);
 354             else
 355                 return newRegularWithACC(pool);
 356         }
 357 
 358         /*
 359          * Create and use static AccessControlContexts only if there
 360          * is a SecurityManager. (These can be removed if/when
 361          * SecurityManagers are removed from platform.) The ACCs are
 362          * immutable and equivalent even when racily initialized, so
 363          * they don't require locking, although with the chance of
 364          * needlessly duplicate construction.
 365          */
 366         @SuppressWarnings("removal")
 367         static volatile AccessControlContext regularACC, commonACC;
 368 
 369         @SuppressWarnings("removal")
 370         static ForkJoinWorkerThread newRegularWithACC(ForkJoinPool pool) {
 371             AccessControlContext acc = regularACC;
 372             if (acc == null) {
 373                 Permissions ps = new Permissions();
 374                 ps.add(new RuntimePermission("getClassLoader"));
 375                 ps.add(new RuntimePermission("setContextClassLoader"));
 376                 regularACC = acc =
 377                     new AccessControlContext(new ProtectionDomain[] {
 378                             new ProtectionDomain(null, ps) });
 379             }
 380             return AccessController.doPrivileged(
 381                 new PrivilegedAction<>() {
 382                     public ForkJoinWorkerThread run() {
 383                         return new ForkJoinWorkerThread(null, pool, true, false);
 384                     }}, acc);
 385         }
 386 
 387         @SuppressWarnings("removal")
 388         static ForkJoinWorkerThread newCommonWithACC(ForkJoinPool pool) {
 389             AccessControlContext acc = commonACC;
 390             if (acc == null) {
 391                 Permissions ps = new Permissions();
 392                 ps.add(new RuntimePermission("getClassLoader"));
 393                 ps.add(new RuntimePermission("setContextClassLoader"));
 394                 ps.add(new RuntimePermission("modifyThread"));
 395                 ps.add(new RuntimePermission("enableContextClassLoaderOverride"));
 396                 ps.add(new RuntimePermission("modifyThreadGroup"));
 397                 commonACC = acc =
 398                     new AccessControlContext(new ProtectionDomain[] {
 399                             new ProtectionDomain(null, ps) });
 400             }
 401             return AccessController.doPrivileged(
 402                 new PrivilegedAction<>() {
 403                     public ForkJoinWorkerThread run() {
 404                         return new ForkJoinWorkerThread.
 405                             InnocuousForkJoinWorkerThread(pool);
 406                     }}, acc);
 407         }
 408     }
 409 
 410     /**
 411      * Queues supporting work-stealing as well as external task
 412      * submission. See above for descriptions and algorithms.
 413      */
 414     static final class WorkQueue {
 415         // fields declared in order of their likely layout on most VMs
 416         final ForkJoinWorkerThread owner; // null if shared
 417         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
 418         int base;                  // index of next slot for poll
 419         final int config;          // mode bits
 420 
 421         // fields otherwise causing more unnecessary false-sharing cache misses
 422         @jdk.internal.vm.annotation.Contended("w")
 423         int top;                   // index of next slot for push
 424         @jdk.internal.vm.annotation.Contended("w")
 425         volatile int phase;        // versioned active status
 426         @jdk.internal.vm.annotation.Contended("w")
 427         int stackPred;             // pool stack (ctl) predecessor link
 428         @jdk.internal.vm.annotation.Contended("w")
 429         volatile int source;       // source queue id (or DEREGISTERED)
 430         @jdk.internal.vm.annotation.Contended("w")
 431         int nsteals;               // number of steals from other queues
 432         @jdk.internal.vm.annotation.Contended("w")
 433         volatile int parking;      // nonzero if parked in awaitWork
 434 
 435         // Support for atomic operations
 436         private static final Unsafe U;
 437         private static final long PHASE;
 438         private static final long BASE;
 439         private static final long TOP;
 440         private static final long ARRAY;
 441 
 442         final void updateBase(int v) {
 443             U.putIntVolatile(this, BASE, v);
 444         }
 445         final void updateTop(int v) {
 446             U.putIntOpaque(this, TOP, v);
 447         }
 448         final void updateArray(ForkJoinTask<?>[] a) {
 449             U.getAndSetReference(this, ARRAY, a);
 450         }
 451         final void unlockPhase() {
 452             U.getAndAddInt(this, PHASE, IDLE);
 453         }
 454         final boolean tryLockPhase() {    // seqlock acquire
 455             int p;
 456             return (((p = phase) & IDLE) != 0 &&
 457                     U.compareAndSetInt(this, PHASE, p, p + IDLE));
 458         }
 459 
 460         /**
 461          * Constructor. For internal queues, most fields are initialized
 462          * upon thread start in pool.registerWorker.
 463          */
 464         WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
 465                   boolean clearThreadLocals) {
 466             if (clearThreadLocals)
 467                 cfg |= CLEAR_TLS;
 468             this.config = cfg;
 469             top = base = 1;
 470             this.phase = id;
 471             this.owner = owner;
 472         }
 473 
 474         /**
 475          * Returns an exportable index (used by ForkJoinWorkerThread).
 476          */
 477         final int getPoolIndex() {
 478             return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
 479         }
 480 
 481         /**
 482          * Returns the approximate number of tasks in the queue.
 483          */
 484         final int queueSize() {
 485             int unused = phase;             // for ordering effect
 486             return Math.max(top - base, 0); // ignore transient negative
 487         }
 488 
 489         /**
 490          * Pushes a task. Called only by owner or if already locked
 491          *
 492          * @param task the task. Caller must ensure non-null.
 493          * @param pool the pool to signal if was previously empty, else null
 494          * @param internal if caller owns this queue
 495          * @throws RejectedExecutionException if array could not be resized
 496          */
 497         final void push(ForkJoinTask<?> task, ForkJoinPool pool,
 498                         boolean internal) {
 499             int s = top, b = base, cap, m, p, room, newCap; ForkJoinTask<?>[] a;
 500             if ((a = array) == null || (cap = a.length) <= 0 ||
 501                 (room = (m = cap - 1) - (s - b)) < 0) { // could not resize
 502                 if (!internal)
 503                     unlockPhase();
 504                 throw new RejectedExecutionException("Queue capacity exceeded");
 505             }
 506             top = s + 1;
 507             long pos = slotOffset(p = m & s);
 508             if (!internal)
 509                 U.putReference(a, pos, task);         // inside lock
 510             else
 511                 U.getAndSetReference(a, pos, task);   // fully fenced
 512             if (room == 0 && (newCap = cap << 1) > 0) {
 513                 ForkJoinTask<?>[] newArray = null;
 514                 try {                                 // resize for next time
 515                     newArray = new ForkJoinTask<?>[newCap];
 516                 } catch (OutOfMemoryError ex) {
 517                 }
 518                 if (newArray != null) {               // else throw on next push
 519                     int newMask = newCap - 1;         // poll old, push to new
 520                     p = newMask & s;
 521                     for (int k = s, j = cap; j > 0; --j, --k) {
 522                         ForkJoinTask<?> u;
 523                         if ((u = (ForkJoinTask<?>)U.getAndSetReference(
 524                                  a, slotOffset(k & m), null)) == null)
 525                             break;                    // lost to pollers
 526                         newArray[k & newMask] = u;
 527                     }
 528                     updateArray(a = newArray);        // fully fenced
 529                 }
 530             }
 531             if (!internal)
 532                 unlockPhase();
 533             if ((room == 0 || a[m & (s - 1)] == null) && pool != null)
 534                 pool.signalWork(a, p);
 535         }
 536 
 537         /**
 538          * Takes next task, if one exists, in order specified by mode,
 539          * so acts as either local-pop or local-poll. Called only by owner.
 540          * @param fifo nonzero if FIFO mode
 541          */
 542         private ForkJoinTask<?> nextLocalTask(int fifo) {
 543             ForkJoinTask<?> t = null;
 544             ForkJoinTask<?>[] a = array;
 545             int b = base, p = top, cap;
 546             if (a != null && (cap = a.length) > 0) {
 547                 for (int m = cap - 1, s, nb; p - b > 0; ) {
 548                     if (fifo == 0 || (nb = b + 1) == p) {
 549                         if ((t = (ForkJoinTask<?>)U.getAndSetReference(
 550                                  a, slotOffset(m & (s = p - 1)), null)) != null)
 551                             updateTop(s);       // else lost race for only task
 552                         break;
 553                     }
 554                     if ((t = (ForkJoinTask<?>)U.getAndSetReference(
 555                              a, slotOffset(m & b), null)) != null) {
 556                         updateBase(nb);
 557                         break;
 558                     }
 559                     while (b == (b = base)) {
 560                         U.loadFence();
 561                         Thread.onSpinWait();    // spin to reduce memory traffic
 562                     }
 563                 }
 564             }
 565             return t;
 566         }
 567 
 568         /**
 569          * Takes next task, if one exists, using configured mode.
 570          * (Always internal, never called for Common pool.)
 571          */
 572         final ForkJoinTask<?> nextLocalTask() {
 573             return nextLocalTask(config & FIFO);
 574         }
 575 
 576         /**
 577          * Pops the given task only if it is at the current top.
 578          * @param task the task. Caller must ensure non-null.
 579          * @param internal if caller owns this queue
 580          */
 581         final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
 582             boolean taken = false;
 583             ForkJoinTask<?>[] a = array;
 584             int p = top, s = p - 1, cap, k;
 585             if (a != null && (cap = a.length) > 0 &&
 586                 a[k = (cap - 1) & s] == task &&
 587                 (internal || tryLockPhase())) {
 588                 if (top == p &&
 589                     U.compareAndSetReference(a, slotOffset(k), task, null)) {
 590                     taken = true;
 591                     updateTop(s);
 592                 }
 593                 if (!internal)
 594                     unlockPhase();
 595             }
 596             return taken;
 597         }
 598 
 599         /**
 600          * Returns next task, if one exists, in order specified by mode.
 601          */
 602         final ForkJoinTask<?> peek() {
 603             ForkJoinTask<?>[] a = array;
 604             int b = base, cfg = config, p = top, cap;
 605             if (p != b && a != null && (cap = a.length) > 0) {
 606                 if ((cfg & FIFO) == 0)
 607                     return a[(cap - 1) & (p - 1)];
 608                 else { // skip over in-progress removals
 609                     ForkJoinTask<?> t;
 610                     for ( ; p - b > 0; ++b) {
 611                         if ((t = a[(cap - 1) & b]) != null)
 612                             return t;
 613                     }
 614                 }
 615             }
 616             return null;
 617         }
 618 
 619         /**
 620          * Polls for a task. Used only by non-owners.
 621          *
 622          */
 623         final ForkJoinTask<?> poll() {
 624             for (;;) {
 625                 ForkJoinTask<?>[] a = array;
 626                 int b = base, cap, k;
 627                 if (a == null || (cap = a.length) <= 0)
 628                     break;
 629                 ForkJoinTask<?> t = a[k = b & (cap - 1)];
 630                 U.loadFence();
 631                 if (base == b) {
 632                     Object o;
 633                     int nb = b + 1, nk = nb & (cap - 1);
 634                     if (t == null)
 635                         o = a[k];
 636                     else if (t == (o = U.compareAndExchangeReference(
 637                                        a, slotOffset(k), t, null))) {
 638                         updateBase(nb);
 639                         return t;
 640                     }
 641                     if (o == null && a[nk] == null && array == a &&
 642                         (phase & (IDLE | 1)) != 0 && top - base <= 0)
 643                         break;                    // empty
 644                 }
 645             }
 646             return null;
 647         }
 648 
 649         // specialized execution methods
 650 
 651         /**
 652          * Runs the given task, as well as remaining local tasks
 653          */
 654         @DontInline // avoid dispatch in caller method (scan)
 655         final void topLevelExec(ForkJoinTask<?> task, int cfg) {
 656             if ((cfg & CLEAR_TLS) != 0)
 657                 ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
 658             int fifo = cfg & FIFO;
 659             while (task != null) {
 660                 task.doExec();
 661                 task = nextLocalTask(fifo);
 662             }
 663         }
 664 
 665         /**
 666          * Deep form of tryUnpush: Traverses from top and removes and
 667          * runs task if present.
 668          */
 669         final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
 670             ForkJoinTask<?>[] a = array;
 671             int b = base, p = top, s = p - 1, d = p - b, cap;
 672             if (a != null && (cap = a.length) > 0) {
 673                 for (int m = cap - 1, i = s; d > 0; --i, --d) {
 674                     ForkJoinTask<?> t; int k; boolean taken;
 675                     if ((t = a[k = i & m]) == null)
 676                         break;
 677                     if (t == task) {
 678                         long pos = slotOffset(k);
 679                         if (!internal && !tryLockPhase())
 680                             break;                  // fail if locked
 681                         if (taken =
 682                             (top == p &&
 683                              U.compareAndSetReference(a, pos, task, null))) {
 684                             if (i == s)             // act as pop
 685                                 updateTop(s);
 686                             else if (i == base)     // act as poll
 687                                 updateBase(i + 1);
 688                             else {                  // swap with top
 689                                 U.putReferenceVolatile(
 690                                     a, pos, (ForkJoinTask<?>)
 691                                     U.getAndSetReference(
 692                                         a, slotOffset(s & m), null));
 693                                 updateTop(s);
 694                             }
 695                         }
 696                         if (!internal)
 697                             unlockPhase();
 698                         if (taken)
 699                             task.doExec();
 700                         break;
 701                     }
 702                 }
 703             }
 704         }
 705 
 706         /**
 707          * Tries to pop and run tasks within the target's computation
 708          * until done, not found, or limit exceeded.
 709          *
 710          * @param task root of computation
 711          * @param limit max runs, or zero for no limit
 712          * @return task status if known to be done
 713          */
 714         final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) {
 715             int status = 0;
 716             if (task != null) {
 717                 outer: for (;;) {
 718                     ForkJoinTask<?>[] a; ForkJoinTask<?> t; boolean taken;
 719                     int stat, p, s, cap, k;
 720                     if ((stat = task.status) < 0) {
 721                         status = stat;
 722                         break;
 723                     }
 724                     if ((a = array) == null || (cap = a.length) <= 0)
 725                         break;
 726                     if ((t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null)
 727                         break;
 728                     if (!(t instanceof CountedCompleter))
 729                         break;
 730                     CountedCompleter<?> f = (CountedCompleter<?>)t;
 731                     for (int steps = cap;;) {       // bound path
 732                         if (f == task)
 733                             break;
 734                         if ((f = f.completer) == null || --steps == 0)
 735                             break outer;
 736                     }
 737                     if (!internal && !tryLockPhase())
 738                         break;
 739                     if (taken =
 740                         (top == p &&
 741                          U.compareAndSetReference(a, slotOffset(k), t, null)))
 742                         updateTop(s);
 743                     if (!internal)
 744                         unlockPhase();
 745                     if (!taken)
 746                         break;
 747                     t.doExec();
 748                     if (limit != 0 && --limit == 0)
 749                         break;
 750                 }
 751             }
 752             return status;
 753         }
 754 
 755         /**
 756          * Tries to poll and run AsynchronousCompletionTasks until
 757          * none found or blocker is released
 758          *
 759          * @param blocker the blocker
 760          */
 761         final void helpAsyncBlocker(ManagedBlocker blocker) {
 762             for (;;) {
 763                 ForkJoinTask<?>[] a; int b, cap, k;
 764                 if ((a = array) == null || (cap = a.length) <= 0)
 765                     break;
 766                 ForkJoinTask<?> t = a[k = (b = base) & (cap - 1)];
 767                 U.loadFence();
 768                 if (t == null) {
 769                     if (top - b <= 0)
 770                         break;
 771                 }
 772                 else if (!(t instanceof CompletableFuture
 773                            .AsynchronousCompletionTask))
 774                     break;
 775                 if (blocker != null && blocker.isReleasable())
 776                     break;
 777                 if (base == b && t != null &&
 778                     U.compareAndSetReference(a, slotOffset(k), t, null)) {
 779                     updateBase(b + 1);
 780                     t.doExec();
 781                 }
 782             }
 783         }
 784 
 785         // misc
 786 
 787         /**
 788          * Returns true if internal and not known to be blocked.
 789          */
 790         final boolean isApparentlyUnblocked() {
 791             Thread wt; Thread.State s;
 792             return ((wt = owner) != null && (phase & IDLE) != 0 &&
 793                     (s = wt.getState()) != Thread.State.BLOCKED &&
 794                     s != Thread.State.WAITING &&
 795                     s != Thread.State.TIMED_WAITING);
 796         }
 797 
 798         static {
 799             U = Unsafe.getUnsafe();
 800             Class<WorkQueue> klass = WorkQueue.class;
 801             PHASE = U.objectFieldOffset(klass, "phase");
 802             BASE = U.objectFieldOffset(klass, "base");
 803             TOP = U.objectFieldOffset(klass, "top");
 804             ARRAY = U.objectFieldOffset(klass, "array");
 805         }
 806     }
 807 
 808     // static fields (initialized in static initializer below)
 809 
 810     /**
 811      * Creates a new ForkJoinWorkerThread. This factory is used unless
 812      * overridden in ForkJoinPool constructors.
 813      */
 814     public static final ForkJoinWorkerThreadFactory
 815         defaultForkJoinWorkerThreadFactory;
 816 
 817     /**
 818      * Common (static) pool. Non-null for public use unless a static
 819      * construction exception, but internal usages null-check on use
 820      * to paranoically avoid potential initialization circularities
 821      * as well as to simplify generated code.
 822      */
 823     static final ForkJoinPool common;
 824 
 825     /**
 826      * Sequence number for creating worker names
 827      */
 828     private static volatile int poolIds;
 829 
 830     /**
 831      * Permission required for callers of methods that may start or
 832      * kill threads. Lazily constructed.
 833      */
 834     static volatile RuntimePermission modifyThreadPermission;
 835 
 836     // fields declared in order of their likely layout on most VMs
 837     volatile CountDownLatch termination; // lazily constructed
 838     final Predicate<? super ForkJoinPool> saturate;
 839     final ForkJoinWorkerThreadFactory factory;
 840     final UncaughtExceptionHandler ueh;  // per-worker UEH
 841     final SharedThreadContainer container;
 842     final String workerNamePrefix;       // null for common pool
 843     WorkQueue[] queues;                  // main registry
 844     final long keepAlive;                // milliseconds before dropping if idle
 845     final long config;                   // static configuration bits
 846     volatile long stealCount;            // collects worker nsteals
 847     volatile long threadIds;             // for worker thread names
 848     volatile int runState;               // versioned, lockable
 849     @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
 850     volatile long ctl;                   // main pool control
 851     @jdk.internal.vm.annotation.Contended("fjpctl") // colocate
 852     int parallelism;                     // target number of workers
 853 
 854     // Support for atomic operations
 855     private static final Unsafe U;
 856     private static final long CTL;
 857     private static final long RUNSTATE;
 858     private static final long PARALLELISM;
 859     private static final long THREADIDS;
 860     private static final long TERMINATION;
 861     private static final Object POOLIDS_BASE;
 862     private static final long POOLIDS;
 863 
 864     private boolean compareAndSetCtl(long c, long v) {
 865         return U.compareAndSetLong(this, CTL, c, v);
 866     }
 867     private long compareAndExchangeCtl(long c, long v) {
 868         return U.compareAndExchangeLong(this, CTL, c, v);
 869     }
 870     private long getAndAddCtl(long v) {
 871         return U.getAndAddLong(this, CTL, v);
 872     }
 873     private long incrementThreadIds() {
 874         return U.getAndAddLong(this, THREADIDS, 1L);
 875     }
 876     private static int getAndAddPoolIds(int x) {
 877         return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x);
 878     }
 879     private int getAndSetParallelism(int v) {
 880         return U.getAndSetInt(this, PARALLELISM, v);
 881     }
 882     private int getParallelismOpaque() {
 883         return U.getIntOpaque(this, PARALLELISM);
 884     }
 885     private CountDownLatch cmpExTerminationSignal(CountDownLatch x) {
 886         return (CountDownLatch)
 887             U.compareAndExchangeReference(this, TERMINATION, null, x);
 888     }
 889 
 890     // runState operations
 891 
 892     private int getAndBitwiseOrRunState(int v) { // for status bits
 893         return U.getAndBitwiseOrInt(this, RUNSTATE, v);
 894     }
 895     private boolean casRunState(int c, int v) {
 896         return U.compareAndSetInt(this, RUNSTATE, c, v);
 897     }
 898     private void unlockRunState() {              // increment lock bit
 899         U.getAndAddInt(this, RUNSTATE, RS_LOCK);
 900     }
 901     private int lockRunState() {                // lock and return current state
 902         int s, u;                               // locked when RS_LOCK set
 903         if (((s = runState) & RS_LOCK) == 0 && casRunState(s, u = s + RS_LOCK))
 904             return u;
 905         else
 906             return spinLockRunState();
 907     }
 908     private int spinLockRunState() {            // spin/sleep
 909         for (int waits = 0, s, u;;) {
 910             if (((s = runState) & RS_LOCK) == 0) {
 911                 if (casRunState(s, u = s + RS_LOCK))
 912                     return u;
 913                 waits = 0;
 914             } else if (waits < SPIN_WAITS) {
 915                 ++waits;
 916                 Thread.onSpinWait();
 917             } else {
 918                 if (waits < MIN_SLEEP)
 919                     waits = MIN_SLEEP;
 920                 LockSupport.parkNanos(this, (long)waits);
 921                 if (waits < MAX_SLEEP)
 922                     waits <<= 1;
 923             }
 924         }
 925     }
 926 
 927     static boolean poolIsStopping(ForkJoinPool p) { // Used by ForkJoinTask
 928         return p != null && (p.runState & STOP) != 0;
 929     }
 930 
 931     // Creating, registering, and deregistering workers
 932 
 933     /**
 934      * Tries to construct and start one worker. Assumes that total
 935      * count has already been incremented as a reservation.  Invokes
 936      * deregisterWorker on any failure.
 937      *
 938      * @return true if successful
 939      */
 940     private boolean createWorker() {
 941         ForkJoinWorkerThreadFactory fac = factory;
 942         SharedThreadContainer ctr = container;
 943         Throwable ex = null;
 944         ForkJoinWorkerThread wt = null;
 945         try {
 946             if ((runState & STOP) == 0 &&  // avoid construction if terminating
 947                 fac != null && (wt = fac.newThread(this)) != null) {
 948                 if (ctr != null)
 949                     ctr.start(wt);
 950                 else
 951                     wt.start();
 952                 return true;
 953             }
 954         } catch (Throwable rex) {
 955             ex = rex;
 956         }
 957         deregisterWorker(wt, ex);
 958         return false;
 959     }
 960 
 961     /**
 962      * Provides a name for ForkJoinWorkerThread constructor.
 963      */
 964     final String nextWorkerThreadName() {
 965         String prefix = workerNamePrefix;
 966         long tid = incrementThreadIds() + 1L;
 967         if (prefix == null) // commonPool has no prefix
 968             prefix = "ForkJoinPool.commonPool-worker-";
 969         return prefix.concat(Long.toString(tid));
 970     }
 971 
 972     /**
 973      * Finishes initializing and records internal queue.
 974      *
 975      * @param w caller's WorkQueue
 976      */
 977     final void registerWorker(WorkQueue w) {
 978         if (w != null) {
 979             w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
 980             ThreadLocalRandom.localInit();
 981             int seed = w.stackPred = ThreadLocalRandom.getProbe();
 982             int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
 983             int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
 984             int stop = lockRunState() & STOP;
 985             try {
 986                 WorkQueue[] qs; int n;
 987                 if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0) {
 988                     for (int k = n, m = n - 1;  ; id += 2) {
 989                         if (qs[id &= m] == null)
 990                             break;
 991                         if ((k -= 2) <= 0) {
 992                             id |= n;
 993                             break;
 994                         }
 995                     }
 996                     w.phase = id | phaseSeq;    // now publishable
 997                     if (id < n)
 998                         qs[id] = w;
 999                     else {                      // expand
1000                         int an = n << 1, am = an - 1;
1001                         WorkQueue[] as = new WorkQueue[an];
1002                         as[id & am] = w;
1003                         for (int j = 1; j < n; j += 2)
1004                             as[j] = qs[j];
1005                         for (int j = 0; j < n; j += 2) {
1006                             WorkQueue q;        // shared queues may move
1007                             if ((q = qs[j]) != null)
1008                                 as[q.phase & EXTERNAL_ID_MASK & am] = q;
1009                         }
1010                         U.storeFence();         // fill before publish
1011                         queues = as;
1012                     }
1013                 }
1014             } finally {
1015                 unlockRunState();
1016             }
1017         }
1018     }
1019 
1020     /**
1021      * Final callback from terminating worker, as well as upon failure
1022      * to construct or start a worker.  Removes record of worker from
1023      * array, and adjusts counts. If pool is shutting down, tries to
1024      * complete termination.
1025      *
1026      * @param wt the worker thread, or null if construction failed
1027      * @param ex the exception causing failure, or null if none
1028      */
1029     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1030         WorkQueue w = null;
1031         int src = 0, phase = 0;
1032         boolean replaceable = false;
1033         if (wt != null && (w = wt.workQueue) != null) {
1034             phase = w.phase;
1035             if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
1036                 w.source = DEREGISTERED;
1037                 if (phase != 0) {         // else failed to start
1038                     replaceable = true;
1039                     if ((phase & IDLE) != 0)
1040                         releaseAll();     // pool stopped before released
1041                 }
1042             }
1043         }
1044         if (src != DEREGISTERED) {        // decrement counts
1045             long c = ctl;
1046             do {} while (c != (c = compareAndExchangeCtl(
1047                                    c, ((RC_MASK & (c - RC_UNIT)) |
1048                                        (TC_MASK & (c - TC_UNIT)) |
1049                                        (LMASK & c)))));
1050             if (w != null) {              // cancel remaining tasks
1051                 for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
1052                     try {
1053                         t.cancel(false);
1054                     } catch (Throwable ignore) {
1055                     }
1056                 }
1057             }
1058         }
1059         if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
1060             WorkQueue[] qs; int n, i;     // remove index unless terminating
1061             long ns = w.nsteals & 0xffffffffL;
1062             if ((lockRunState() & STOP) != 0)
1063                 replaceable = false;
1064             else if ((qs = queues) != null && (n = qs.length) > 0 &&
1065                      qs[i = phase & SMASK & (n - 1)] == w) {
1066                 qs[i] = null;
1067                 stealCount += ns;         // accumulate steals
1068             }
1069             unlockRunState();
1070             if (replaceable)
1071                 signalWork(null, 0);
1072         }
1073         if (ex != null)
1074             ForkJoinTask.rethrow(ex);
1075     }
1076 
1077     /**
1078      * Releases an idle worker, or creates one if not enough exist,
1079      * returning on contention if a signal task is already taken.
1080      *
1081      * @param a if nonnull, a task array holding task signalled
1082      * @param k index of task in array
1083      */
1084     final void signalWork(ForkJoinTask<?>[] a, int k) {
1085         int pc = parallelism;
1086         for (long c = ctl;;) {
1087             WorkQueue[] qs = queues;
1088             long ac = (c + RC_UNIT) & RC_MASK, nc;
1089             int sp = (int)c, i = sp & SMASK;
1090             if ((short)(c >>> RC_SHIFT) >= pc)
1091                 break;
1092             if (qs == null || qs.length <= i)
1093                 break;
1094             WorkQueue w = qs[i], v = null;
1095             if (sp == 0) {
1096                 if ((short)(c >>> TC_SHIFT) >= pc)
1097                     break;
1098                 nc = ac | ((c + TC_UNIT) & TC_MASK);
1099             }
1100             else if ((v = w) == null)
1101                 break;
1102             else
1103                 nc = ac | (c & TC_MASK) | (v.stackPred & LMASK);
1104             if (a != null && a.length > k && k >= 0 && a[k] == null)
1105                 break;
1106             if (c == (c = compareAndExchangeCtl(c, nc))) {
1107                 if (v == null)
1108                     createWorker();
1109                 else {
1110                     v.phase = sp;
1111                     if (v.parking != 0)
1112                         U.unpark(v.owner);
1113                 }
1114                 break;
1115             }
1116         }
1117     }
1118 
1119     /**
1120      * Reactivates the given worker, and possibly others if not top of
1121      * ctl stack. Called only during shutdown to ensure release on
1122      * termination.
1123      */
1124     private void releaseAll() {
1125         for (long c = ctl;;) {
1126             WorkQueue[] qs; WorkQueue v; int sp, i;
1127             if ((sp = (int)c) == 0 || (qs = queues) == null ||
1128                 qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
1129                 break;
1130             if (c == (c = compareAndExchangeCtl(
1131                           c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
1132                               (v.stackPred & LMASK))))) {
1133                 v.phase = sp;
1134                 if (v.parking != 0)
1135                     U.unpark(v.owner);
1136             }
1137         }
1138     }
1139 
1140     /**
1141      * Internal version of isQuiescent and related functionality.
1142      * @return true if terminating or all workers are inactive and
1143      * submission queues are empty and unlocked; if so, setting STOP
1144      * if shutdown is enabled
1145      */
1146     private boolean quiescent() {
1147         outer: for (;;) {
1148             long phaseSum = 0L;
1149             boolean swept = false;
1150             for (int e, prevRunState = 0; ; prevRunState = e) {
1151                 long c = ctl;
1152                 if (((e = runState) & STOP) != 0)
1153                     return true;                          // terminating
1154                 else if ((c & RC_MASK) > 0L)
1155                     return false;                         // at least one active
1156                 else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
1157                     long sum = c;
1158                     WorkQueue[] qs = queues; WorkQueue q;
1159                     int n = (qs == null) ? 0 : qs.length;
1160                     for (int i = 0; i < n; ++i) {         // scan queues
1161                         if ((q = qs[i]) != null) {
1162                             int p = q.phase, s = q.top, b = q.base;
1163                             sum += (p & 0xffffffffL) | ((long)b << 32);
1164                             if ((p & IDLE) == 0 || s - b > 0) {
1165                                 if ((i & 1) == 0 && compareAndSetCtl(c, c))
1166                                     signalWork(q.array, q.base);  // ensure live
1167                                 return false;
1168                             }
1169                         }
1170                     }
1171                     swept = (phaseSum == (phaseSum = sum));
1172                 }
1173                 else if ((e & SHUTDOWN) == 0)
1174                     return true;
1175                 else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) {
1176                     releaseAll();                         // confirmed
1177                     return true;                          // enable termination
1178                 }
1179                 else
1180                     break;                                // restart
1181             }
1182         }
1183     }
1184 
1185     /**
1186      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1187      * See above for explanation.
1188      *
1189      * @param w caller's WorkQueue (may be null on failed initialization)
1190      */
1191     final void runWorker(WorkQueue w) {
1192         if (w != null) {
1193             int cfg = w.config & (FIFO|CLEAR_TLS), r = w.stackPred;
1194             do {
1195                 r = scan(w, r, cfg);
1196             } while (awaitWork(w) == 0);
1197         }
1198     }
1199 
1200     /**
1201      * Scans for and executes top-level tasks until none found.
1202      *
1203      * @param w caller's WorkQueue
1204      * @param p w's phase
1205      * @param r random seed
1206      * @param cfg config bits
1207      * @return random seed, for next use
1208      */
1209     private int scan(WorkQueue w, int r, int cfg) {
1210         int inactive = 0;                           // nonzero after empty scan
1211         int nsteals = 0;                            // to update stealCount
1212         boolean screening = false;                  // self-signal precheck
1213         boolean reactivatable = false;              // enable self-signalling
1214         boolean running = false;                    // ran task since activating
1215         boolean rescan = true;
1216         for (;;) {
1217             int e, n; WorkQueue[] qs;
1218             if (((e = runState) & STOP) != 0 || w == null)
1219                 break;
1220             boolean wasScreening = screening;
1221             screening = false;
1222             if (!rescan) {
1223                 if (inactive != 0) {
1224                     int noise = (((SPIN_WAITS >>> 1) - 1) & (r >>> 16)) | 0xf;
1225                     int spins = (wasScreening) ? noise | SPIN_WAITS : noise;
1226                     while ((inactive = w.phase & IDLE) != 0 && --spins > 0)
1227                         Thread.onSpinWait();        // reduce flailing
1228                     if (inactive != 0 && !(wasScreening & reactivatable))
1229                         break;                      // block or terminate
1230                 }
1231                 else {
1232                     long pc = ctl;                  // try to deactivate
1233                     int phase = w.phase;
1234                     long qc = (((phase + (IDLE << 1)) & LMASK) |
1235                                ((pc - RC_UNIT) & UMASK));
1236                     w.stackPred = (int)pc;          // set ctl stack link
1237                     w.phase = phase | IDLE;
1238                     if (runState != e || !compareAndSetCtl(pc, qc))
1239                         w.phase = phase;            // back out
1240                     else if ((qc & RC_MASK) <= 0L)
1241                         break;                      // check quiescent shutdown
1242                     else {
1243                         screening = true;
1244                         running = false;
1245                         inactive = IDLE;
1246                     }
1247                 }
1248             }
1249             else if (inactive != 0)
1250                 inactive = w.phase & IDLE;
1251             rescan = reactivatable = false;
1252             r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
1253             if ((qs = queues) == null || (n = qs.length) <= 0)
1254                 break;                              // currently impossible
1255             int steps = n, i = r, stride = (r >>> 24) | 1;
1256             do {
1257                 int src, cap; WorkQueue q; ForkJoinTask<?>[] a;
1258                 if ((q = qs[src = (i += stride) & (n - 1)]) != null &&
1259                     (a = q.array) != null && (cap = a.length) > 0) {
1260                     for (int b = 0;;) {
1261                         int pb = b, k, nk;
1262                         ForkJoinTask<?> t = a[k = (cap - 1) & (b = q.base)];
1263                         U.loadFence();              // re-read
1264                         if (q.base != b) {          // inconsistent
1265                             if (rescan | screening)
1266                                 break;              // reduce contention
1267                             else
1268                                 continue;
1269                         }
1270                         if (t == null) {
1271                             ForkJoinTask<?>[] na = q.array;
1272                             if (a[k] != null)       // stale
1273                                 continue;
1274                             else if (a[(b + 1) & (cap - 1)] == null) {
1275                                 if (na != a)        // resized
1276                                     rescan = true;  // else probably empty
1277                                 break;
1278                             }
1279                             else if (rescan | screening)
1280                                 break;              // already retrying
1281                             else if (pb == b) {
1282                                 rescan = true;
1283                                 break;              // retry later
1284                             }
1285                             else
1286                                 continue;           // not (yet) stalled
1287                         }
1288                         long kp = slotOffset(k);
1289                         if (inactive != 0) {        // recheck or reactivate
1290                             int p = w.phase, sp = w.stackPred, nextp; long c;
1291                             if ((inactive = p & IDLE) == 0)
1292                                 rescan = true;
1293                             else if (screening) {
1294                                 reactivatable = true;
1295                                 break;              // not yet enabled
1296                             }
1297                             else if ((nextp = p + 1) != (int)(c = ctl))
1298                                 break;              // ineligible
1299                             else if (q.base != b)
1300                                 break;              // already taken
1301                             else if (!compareAndSetCtl(
1302                                          c, ((sp & LMASK) |
1303                                              ((c + RC_UNIT) & UMASK))))
1304                                 break;              // lost race
1305                             else {
1306                                 w.phase = nextp;
1307                                 inactive = 0;       // self-signalled
1308                             }
1309                         }
1310                         if (!U.compareAndSetReference(a, kp, t, null)) {
1311                             if (q.base != b) {      // contention
1312                                 if (rescan | screening)
1313                                     break;
1314                                 ++b;                // for next stall check
1315                             }
1316                         }
1317                         else {
1318                             q.base = ++b;           // taken
1319                             ForkJoinTask<?> nt = a[nk = b & (cap - 1)];
1320                             w.source = src;         // volatile write
1321                             ++nsteals;
1322                             boolean propagate = !running;
1323                             rescan = running = true;
1324                             if (propagate && nt != null)
1325                                 signalWork(a, nk);
1326                             w.topLevelExec(t, cfg);
1327                             if (q.base != b)
1328                                 break;             // reduce interference
1329                         }
1330                     }
1331                 }
1332             } while (--steps > 0);
1333         }
1334         if (nsteals > 0 && w != null)
1335             w.nsteals += nsteals;
1336         return r;
1337     }
1338 
1339     /**
1340      * Awaits signal or termination.
1341      *
1342      * @param w the WorkQueue (may be null if already terminated)
1343      * @return nonzero for exit
1344      */
1345     private int awaitWork(WorkQueue w) {
1346         int p = IDLE, phase;
1347         if ((runState & STOP) == 0 &&      // check for quiescent shutdown
1348             ((ctl & RC_MASK) > 0L || !quiescent() || (runState & STOP) == 0) &&
1349             w != null && (p = (phase = w.phase) & IDLE) != 0) {
1350             LockSupport.setCurrentBlocker(this);
1351             int active = phase + IDLE;     // next w.phase
1352             long c, deadline = 0L;         // set if all idle and w is ctl top
1353             if (((c = ctl) & RC_MASK) <= 0L && (int)c == active) {
1354                 int np = parallelism, nt = (short)(c >>> TC_SHIFT);
1355                 long delay = keepAlive;    // scale if not fully populated
1356                 if (nt != (nt = Math.max(nt, np)) && nt > 0)
1357                     delay = Math.max(TIMEOUT_SLOP, delay / nt);
1358                 long d = delay + System.currentTimeMillis();
1359                 deadline = (d == 0L) ? 1L : d;
1360             }
1361             if ((p = w.phase & IDLE) != 0) {
1362                 w.parking = 1;             // enable unpark
1363                 for (;;) {                 // emulate LockSupport.park
1364                     if ((runState & STOP) != 0)
1365                         break;
1366                     if ((p = w.phase & IDLE) == 0)
1367                         break;
1368                     U.park(deadline != 0L, deadline);
1369                     if ((p = w.phase & IDLE) == 0)
1370                         break;
1371                     if ((runState & STOP) != 0)
1372                         break;
1373                     Thread.interrupted();  // clear for next park
1374                     if (deadline != 0L &&  // try to trim
1375                         deadline - System.currentTimeMillis() < TIMEOUT_SLOP) {
1376                         long sp = w.stackPred & LMASK, dc = ctl;
1377                         long nc = sp | (UMASK & (dc - TC_UNIT));
1378                         if ((int)dc == active && compareAndSetCtl(dc, nc)) {
1379                             WorkQueue[] qs; WorkQueue v; int vp, i;
1380                             w.source = DEREGISTERED;
1381                             w.phase = active; // try to wake up next waiter
1382                             if ((vp = (int)nc) != 0 && (qs = queues) != null &&
1383                                 qs.length > (i = vp & SMASK) &&
1384                                 (v = qs[i]) != null &&
1385                                 compareAndSetCtl(nc, ((UMASK & (nc + RC_UNIT)) |
1386                                                       (nc & TC_MASK) |
1387                                                       (v.stackPred & LMASK)))) {
1388                                 v.phase = vp;
1389                                 U.unpark(v.owner);
1390                             }
1391                             break;
1392                         }
1393                         deadline = 0L;     // no longer trimmable
1394                     }
1395                 }
1396                 w.parking = 0;             // disable unpark
1397             }
1398             LockSupport.setCurrentBlocker(null);
1399         }
1400         return p;
1401     }
1402 
1403     /**
1404      * Scans for and returns a polled task, if available.  Used only
1405      * for untracked polls. Begins scan at a random index to avoid
1406      * systematic unfairness.
1407      *
1408      * @param submissionsOnly if true, only scan submission queues
1409      */
1410     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1411         if ((runState & STOP) == 0) {
1412             WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
1413             int r = ThreadLocalRandom.nextSecondarySeed();
1414             if (submissionsOnly)                 // even indices only
1415                 r &= ~1;
1416             int step = (submissionsOnly) ? 2 : 1;
1417             if ((qs = queues) != null && (n = qs.length) > 0) {
1418                 for (int i = n; i > 0; i -= step, r += step) {
1419                     if ((q = qs[r & (n - 1)]) != null &&
1420                         (t = q.poll()) != null)
1421                         return t;
1422                 }
1423             }
1424         }
1425         return null;
1426     }
1427 
1428     /**
1429      * Tries to decrement counts (sometimes implicitly) and possibly
1430      * arrange for a compensating worker in preparation for
1431      * blocking. May fail due to interference, in which case -1 is
1432      * returned so caller may retry. A zero return value indicates
1433      * that the caller doesn't need to re-adjust counts when later
1434      * unblocked.
1435      *
1436      * @param c incoming ctl value
1437      * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
1438      */
1439     private int tryCompensate(long c) {
1440         Predicate<? super ForkJoinPool> sat;
1441         long b = config;
1442         int pc        = parallelism,                    // unpack fields
1443             minActive = (short)(b >>> RC_SHIFT),
1444             maxTotal  = (short)(b >>> TC_SHIFT) + pc,
1445             active    = (short)(c >>> RC_SHIFT),
1446             total     = (short)(c >>> TC_SHIFT),
1447             sp        = (int)c,
1448             stat      = -1;                             // default retry return
1449         if (sp != 0 && active <= pc) {                  // activate idle worker
1450             WorkQueue[] qs; WorkQueue v; int i;
1451             if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
1452                 (v = qs[i]) != null &&
1453                 compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
1454                 v.phase = sp;
1455                 if (v.parking != 0)
1456                     U.unpark(v.owner);
1457                 stat = UNCOMPENSATE;
1458             }
1459         }
1460         else if (active > minActive && total >= pc) {   // reduce active workers
1461             if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
1462                 stat = UNCOMPENSATE;
1463         }
1464         else if (total < maxTotal && total < MAX_CAP) { // try to expand pool
1465             long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1466             if ((runState & STOP) != 0)                 // terminating
1467                 stat = 0;
1468             else if (compareAndSetCtl(c, nc))
1469                 stat = createWorker() ? UNCOMPENSATE : 0;
1470         }
1471         else if (!compareAndSetCtl(c, c))               // validate
1472             ;
1473         else if ((sat = saturate) != null && sat.test(this))
1474             stat = 0;
1475         else
1476             throw new RejectedExecutionException(
1477                 "Thread limit exceeded replacing blocked worker");
1478         return stat;
1479     }
1480 
1481     /**
1482      * Readjusts RC count; called from ForkJoinTask after blocking.
1483      */
1484     final void uncompensate() {
1485         getAndAddCtl(RC_UNIT);
1486     }
1487 
1488     /**
1489      * Helps if possible until the given task is done.  Processes
1490      * compatible local tasks and scans other queues for task produced
1491      * by w's stealers; returning compensated blocking sentinel if
1492      * none are found.
1493      *
1494      * @param task the task
1495      * @param w caller's WorkQueue
1496      * @param internal true if w is owned by a ForkJoinWorkerThread
1497      * @return task status on exit, or UNCOMPENSATE for compensated blocking
1498      */
1499 
1500     final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
1501         if (w != null)
1502             w.tryRemoveAndExec(task, internal);
1503         int s = 0;
1504         if (task != null && (s = task.status) >= 0 && internal && w != null) {
1505             int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source;
1506             long sctl = 0L;                             // track stability
1507             outer: for (boolean rescan = true;;) {
1508                 if ((s = task.status) < 0)
1509                     break;
1510                 if (!rescan) {
1511                     if ((runState & STOP) != 0)
1512                         break;
1513                     if (sctl == (sctl = ctl) && (s = tryCompensate(sctl)) >= 0)
1514                         break;
1515                 }
1516                 rescan = false;
1517                 WorkQueue[] qs = queues;
1518                 int n = (qs == null) ? 0 : qs.length;
1519                 scan: for (int l = n >>> 1; l > 0; --l, r += 2) {
1520                     int j; WorkQueue q;
1521                     if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
1522                         for (;;) {
1523                             int sq = q.source, b, cap, k; ForkJoinTask<?>[] a;
1524                             if ((a = q.array) == null || (cap = a.length) <= 0)
1525                                 break;
1526                             ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)];
1527                             U.loadFence();
1528                             boolean eligible = false;
1529                             if (t == task)
1530                                 eligible = true;
1531                             else if (t != null) {       // check steal chain
1532                                 for (int v = sq, d = cap;;) {
1533                                     WorkQueue p;
1534                                     if (v == wid) {
1535                                         eligible = true;
1536                                         break;
1537                                     }
1538                                     if ((v & 1) == 0 || // external or none
1539                                         --d < 0 ||      // bound depth
1540                                         (p = qs[v & (n - 1)]) == null)
1541                                         break;
1542                                     v = p.source;
1543                                 }
1544                             }
1545                             if ((s = task.status) < 0)
1546                                 break outer;            // validate
1547                             if (q.source == sq && q.base == b && a[k] == t) {
1548                                 int nb = b + 1, nk = nb & (cap - 1);
1549                                 if (!eligible) {        // revisit if nonempty
1550                                     if (!rescan && t == null &&
1551                                         (a[nk] != null || q.top - b > 0))
1552                                         rescan = true;
1553                                     break;
1554                                 }
1555                                 if (U.compareAndSetReference(
1556                                         a, slotOffset(k), t, null)) {
1557                                     q.updateBase(nb);
1558                                     w.source = j;
1559                                     t.doExec();
1560                                     w.source = wsrc;
1561                                     rescan = true;   // restart at index r
1562                                     break scan;
1563                                 }
1564                             }
1565                         }
1566                     }
1567                 }
1568             }
1569         }
1570         return s;
1571     }
1572 
1573     /**
1574      * Version of helpJoin for CountedCompleters.
1575      *
1576      * @param task root of computation (only called when a CountedCompleter)
1577      * @param w caller's WorkQueue
1578      * @param internal true if w is owned by a ForkJoinWorkerThread
1579      * @return task status on exit, or UNCOMPENSATE for compensated blocking
1580      */
1581     final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
1582         int s = 0;
1583         if (task != null && (s = task.status) >= 0 && w != null) {
1584             int r = w.phase + 1;                          // for indexing
1585             long sctl = 0L;                               // track stability
1586             outer: for (boolean rescan = true, locals = true;;) {
1587                 if (locals && (s = w.helpComplete(task, internal, 0)) < 0)
1588                     break;
1589                 if ((s = task.status) < 0)
1590                     break;
1591                 if (!rescan) {
1592                     if ((runState & STOP) != 0)
1593                         break;
1594                     if (sctl == (sctl = ctl) &&
1595                         (!internal || (s = tryCompensate(sctl)) >= 0))
1596                         break;
1597                 }
1598                 rescan = locals = false;
1599                 WorkQueue[] qs = queues;
1600                 int n = (qs == null) ? 0 : qs.length;
1601                 scan: for (int l = n; l > 0; --l, ++r) {
1602                     int j; WorkQueue q;
1603                     if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
1604                         for (;;) {
1605                             ForkJoinTask<?>[] a; int b, cap, k;
1606                             if ((a = q.array) == null || (cap = a.length) <= 0)
1607                                 break;
1608                             ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)];
1609                             U.loadFence();
1610                             boolean eligible = false;
1611                             if (t instanceof CountedCompleter) {
1612                                 CountedCompleter<?> f = (CountedCompleter<?>)t;
1613                                 for (int steps = cap; steps > 0; --steps) {
1614                                     if (f == task) {
1615                                         eligible = true;
1616                                         break;
1617                                     }
1618                                     if ((f = f.completer) == null)
1619                                         break;
1620                                 }
1621                             }
1622                             if ((s = task.status) < 0)    // validate
1623                                 break outer;
1624                             if (q.base == b) {
1625                                 int nb = b + 1, nk = nb & (cap - 1);
1626                                 if (eligible) {
1627                                     if (U.compareAndSetReference(
1628                                             a, slotOffset(k), t, null)) {
1629                                         q.updateBase(nb);
1630                                         t.doExec();
1631                                         locals = rescan = true;
1632                                         break scan;
1633                                     }
1634                                 }
1635                                 else if (a[k] == t) {
1636                                     if (!rescan && t == null &&
1637                                         (a[nk] != null || q.top - b > 0))
1638                                         rescan = true;    // revisit
1639                                     break;
1640                                 }
1641                             }
1642                         }
1643                     }
1644                 }
1645             }
1646         }
1647         return s;
1648      }
1649 
1650     /**
1651      * Runs tasks until all workers are inactive and no tasks are
1652      * found. Rather than blocking when tasks cannot be found, rescans
1653      * until all others cannot find tasks either.
1654      *
1655      * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
1656      * @param interruptible true if return on interrupt
1657      * @return positive if quiescent, negative if interrupted, else 0
1658      */
1659     private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
1660         int phase; // w.phase inactive bit set when temporarily quiescent
1661         if (w == null || ((phase = w.phase) & IDLE) != 0)
1662             return 0;
1663         int wsrc = w.source;
1664         long startTime = System.nanoTime();
1665         long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); // approx 1% nanos
1666         long prevSum = 0L;
1667         int activePhase = phase, inactivePhase = phase + IDLE;
1668         int r = phase + 1, waits = 0, returnStatus = 1;
1669         boolean locals = true;
1670         for (int e = runState;;) {
1671             if ((e & STOP) != 0)
1672                 break;                      // terminating
1673             if (interruptible && Thread.interrupted()) {
1674                 returnStatus = -1;
1675                 break;
1676             }
1677             if (locals) {                   // run local tasks before (re)polling
1678                 locals = false;
1679                 for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;)
1680                     u.doExec();
1681             }
1682             WorkQueue[] qs = queues;
1683             int n = (qs == null) ? 0 : qs.length;
1684             long phaseSum = 0L;
1685             boolean rescan = false, busy = false;
1686             scan: for (int l = n; l > 0; --l, ++r) {
1687                 int j; WorkQueue q;
1688                 if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) {
1689                     for (;;) {
1690                         ForkJoinTask<?>[] a; int b, cap, k;
1691                         if ((a = q.array) == null || (cap = a.length) <= 0)
1692                             break;
1693                         ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)];
1694                         if (t != null && phase == inactivePhase) // reactivate
1695                             w.phase = phase = activePhase;
1696                         U.loadFence();
1697                         if (q.base == b && a[k] == t) {
1698                             int nb = b + 1;
1699                             if (t == null) {
1700                                 if (!rescan) {
1701                                     int qp = q.phase, mq = qp & (IDLE | 1);
1702                                     phaseSum += qp;
1703                                     if (mq == 0 || q.top - b > 0)
1704                                         rescan = true;
1705                                     else if (mq == 1)
1706                                         busy = true;
1707                                 }
1708                                 break;
1709                             }
1710                             if (U.compareAndSetReference(
1711                                     a, slotOffset(k), t, null)) {
1712                                 q.updateBase(nb);
1713                                 w.source = j;
1714                                 t.doExec();
1715                                 w.source = wsrc;
1716                                 rescan = locals = true;
1717                                 break scan;
1718                             }
1719                         }
1720                     }
1721                 }
1722             }
1723             if (e != (e = runState) || prevSum != (prevSum = phaseSum) ||
1724                 rescan || (e & RS_LOCK) != 0)
1725                 ;                   // inconsistent
1726             else if (!busy)
1727                 break;
1728             else if (phase == activePhase) {
1729                 waits = 0;          // recheck, then sleep
1730                 w.phase = phase = inactivePhase;
1731             }
1732             else if (System.nanoTime() - startTime > nanos) {
1733                 returnStatus = 0;   // timed out
1734                 break;
1735             }
1736             else if (waits == 0)   // same as spinLockRunState except
1737                 waits = MIN_SLEEP; //   with rescan instead of onSpinWait
1738             else {
1739                 LockSupport.parkNanos(this, (long)waits);
1740                 if (waits < maxSleep)
1741                     waits <<= 1;
1742             }
1743         }
1744         w.phase = activePhase;
1745         return returnStatus;
1746     }
1747 
1748     /**
1749      * Helps quiesce from external caller until done, interrupted, or timeout
1750      *
1751      * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
1752      * @param interruptible true if return on interrupt
1753      * @return positive if quiescent, negative if interrupted, else 0
1754      */
1755     private int externalHelpQuiesce(long nanos, boolean interruptible) {
1756         if (!quiescent()) {
1757             long startTime = System.nanoTime();
1758             long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
1759             for (int waits = 0;;) {
1760                 ForkJoinTask<?> t;
1761                 if (interruptible && Thread.interrupted())
1762                     return -1;
1763                 else if ((t = pollScan(false)) != null) {
1764                     waits = 0;
1765                     t.doExec();
1766                 }
1767                 else if (quiescent())
1768                     break;
1769                 else if (System.nanoTime() - startTime > nanos)
1770                     return 0;
1771                 else if (waits == 0)
1772                     waits = MIN_SLEEP;
1773                 else {
1774                     LockSupport.parkNanos(this, (long)waits);
1775                     if (waits < maxSleep)
1776                         waits <<= 1;
1777                 }
1778             }
1779         }
1780         return 1;
1781     }
1782 
1783     /**
1784      * Helps quiesce from either internal or external caller
1785      *
1786      * @param pool the pool to use, or null if any
1787      * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
1788      * @param interruptible true if return on interrupt
1789      * @return positive if quiescent, negative if interrupted, else 0
1790      */
1791     static final int helpQuiescePool(ForkJoinPool pool, long nanos,
1792                                      boolean interruptible) {
1793         Thread t; ForkJoinPool p; ForkJoinWorkerThread wt;
1794         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
1795             (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
1796             (p == pool || pool == null))
1797             return p.helpQuiesce(wt.workQueue, nanos, interruptible);
1798         else if ((p = pool) != null || (p = common) != null)
1799             return p.externalHelpQuiesce(nanos, interruptible);
1800         else
1801             return 0;
1802     }
1803 
1804     /**
1805      * Gets and removes a local or stolen task for the given worker.
1806      *
1807      * @return a task, if available
1808      */
1809     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1810         ForkJoinTask<?> t;
1811         if (w == null || (t = w.nextLocalTask()) == null)
1812             t = pollScan(false);
1813         return t;
1814     }
1815 
1816     // External operations
1817 
1818     /**
1819      * Finds and locks a WorkQueue for an external submitter, or
1820      * throws RejectedExecutionException if shutdown or terminating.
1821      * @param r current ThreadLocalRandom.getProbe() value
1822      * @param isSubmit false if this is for a common pool fork
1823      */
1824     private WorkQueue submissionQueue(int r) {
1825         if (r == 0) {
1826             ThreadLocalRandom.localInit();           // initialize caller's probe
1827             r = ThreadLocalRandom.getProbe();
1828         }
1829         for (;;) {
1830             int n, i, id; WorkQueue[] qs; WorkQueue q;
1831             if ((qs = queues) == null)
1832                 break;
1833             if ((n = qs.length) <= 0)
1834                 break;
1835             if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
1836                 WorkQueue w = new WorkQueue(null, id, 0, false);
1837                 w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1838                 int stop = lockRunState() & STOP;
1839                 if (stop == 0 && queues == qs && qs[i] == null)
1840                     q = qs[i] = w;                   // else discard; retry
1841                 unlockRunState();
1842                 if (q != null)
1843                     return q;
1844                 if (stop != 0)
1845                     break;
1846             }
1847             else if (!q.tryLockPhase())              // move index
1848                 r = ThreadLocalRandom.advanceProbe(r);
1849             else if ((runState & SHUTDOWN) != 0) {
1850                 q.unlockPhase();                     // check while q lock held
1851                 break;
1852             }
1853             else
1854                 return q;
1855         }
1856         tryTerminate(false, false);
1857         throw new RejectedExecutionException();
1858     }
1859 
1860     private void poolSubmit(boolean signalIfEmpty, ForkJoinTask<?> task) {
1861         Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
1862         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1863             (wt = (ForkJoinWorkerThread)t).pool == this) {
1864             internal = true;
1865             q = wt.workQueue;
1866         }
1867         else {                     // find and lock queue
1868             internal = false;
1869             q = submissionQueue(ThreadLocalRandom.getProbe());
1870         }
1871         q.push(task, signalIfEmpty ? this : null, internal);
1872     }
1873 
1874     /**
1875      * Returns queue for an external submission, bypassing call to
1876      * submissionQueue if already established and unlocked.
1877      */
1878     final WorkQueue externalSubmissionQueue() {
1879         WorkQueue[] qs; WorkQueue q; int n;
1880         int r = ThreadLocalRandom.getProbe();
1881         return (((qs = queues) != null && (n = qs.length) > 0 &&
1882                  (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
1883                  q.tryLockPhase()) ? q : submissionQueue(r));
1884     }
1885 
1886     /**
1887      * Returns queue for an external thread, if one exists that has
1888      * possibly ever submitted to the given pool (nonzero probe), or
1889      * null if none.
1890      */
1891     static WorkQueue externalQueue(ForkJoinPool p) {
1892         WorkQueue[] qs; int n;
1893         int r = ThreadLocalRandom.getProbe();
1894         return (p != null && (qs = p.queues) != null &&
1895                 (n = qs.length) > 0 && r != 0) ?
1896             qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
1897     }
1898 
1899     /**
1900      * Returns external queue for common pool.
1901      */
1902     static WorkQueue commonQueue() {
1903         return externalQueue(common);
1904     }
1905 
1906     /**
1907      * If the given executor is a ForkJoinPool, poll and execute
1908      * AsynchronousCompletionTasks from worker's queue until none are
1909      * available or blocker is released.
1910      */
1911     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
1912         WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
1913         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1914             (wt = (ForkJoinWorkerThread)t).pool == e)
1915             w = wt.workQueue;
1916         else if (e instanceof ForkJoinPool)
1917             w = externalQueue((ForkJoinPool)e);
1918         if (w != null)
1919             w.helpAsyncBlocker(blocker);
1920     }
1921 
1922     /**
1923      * Returns a cheap heuristic guide for task partitioning when
1924      * programmers, frameworks, tools, or languages have little or no
1925      * idea about task granularity.  In essence, by offering this
1926      * method, we ask users only about tradeoffs in overhead vs
1927      * expected throughput and its variance, rather than how finely to
1928      * partition tasks.
1929      *
1930      * In a steady state strict (tree-structured) computation, each
1931      * thread makes available for stealing enough tasks for other
1932      * threads to remain active. Inductively, if all threads play by
1933      * the same rules, each thread should make available only a
1934      * constant number of tasks.
1935      *
1936      * The minimum useful constant is just 1. But using a value of 1
1937      * would require immediate replenishment upon each steal to
1938      * maintain enough tasks, which is infeasible.  Further,
1939      * partitionings/granularities of offered tasks should minimize
1940      * steal rates, which in general means that threads nearer the top
1941      * of computation tree should generate more than those nearer the
1942      * bottom. In perfect steady state, each thread is at
1943      * approximately the same level of computation tree. However,
1944      * producing extra tasks amortizes the uncertainty of progress and
1945      * diffusion assumptions.
1946      *
1947      * So, users will want to use values larger (but not much larger)
1948      * than 1 to both smooth over transient shortages and hedge
1949      * against uneven progress; as traded off against the cost of
1950      * extra task overhead. We leave the user to pick a threshold
1951      * value to compare with the results of this call to guide
1952      * decisions, but recommend values such as 3.
1953      *
1954      * When all threads are active, it is on average OK to estimate
1955      * surplus strictly locally. In steady-state, if one thread is
1956      * maintaining say 2 surplus tasks, then so are others. So we can
1957      * just use estimated queue length.  However, this strategy alone
1958      * leads to serious mis-estimates in some non-steady-state
1959      * conditions (ramp-up, ramp-down, other stalls). We can detect
1960      * many of these by further considering the number of "idle"
1961      * threads, that are known to have zero queued tasks, so
1962      * compensate by a factor of (#idle/#active) threads.
1963      */
1964     static int getSurplusQueuedTaskCount() {
1965         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
1966         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1967             (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
1968             (q = wt.workQueue) != null) {
1969             int n = q.top - q.base;
1970             int p = pool.parallelism;
1971             int a = (short)(pool.ctl >>> RC_SHIFT);
1972             return n - (a > (p >>>= 1) ? 0 :
1973                         a > (p >>>= 1) ? 1 :
1974                         a > (p >>>= 1) ? 2 :
1975                         a > (p >>>= 1) ? 4 :
1976                         8);
1977         }
1978         return 0;
1979     }
1980 
1981     // Termination
1982 
1983     /**
1984      * Possibly initiates and/or completes pool termination.
1985      *
1986      * @param now if true, unconditionally terminate, else only
1987      * if no work and no active workers
1988      * @param enable if true, terminate when next possible
1989      * @return runState on exit
1990      */
1991     private int tryTerminate(boolean now, boolean enable) {
1992         int e = runState, isShutdown;
1993         if ((e & STOP) == 0) {
1994             if (now) {
1995                 runState = e = (lockRunState() + RS_LOCK) | STOP | SHUTDOWN;
1996                 releaseAll();
1997             }
1998             else if ((isShutdown = (e & SHUTDOWN)) != 0 || enable) {
1999                 if (isShutdown == 0)
2000                     getAndBitwiseOrRunState(SHUTDOWN);
2001                 quiescent();                    // may trigger STOP
2002                 e = runState;
2003             }
2004         }
2005         if ((e & (STOP | TERMINATED)) == STOP) {
2006             if ((ctl & RC_MASK) > 0L)           // avoid if quiescent shutdown
2007                 helpTerminate(now);
2008             if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
2009                 e |= TERMINATED;
2010                 if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
2011                     CountDownLatch done; SharedThreadContainer ctr;
2012                     if ((done = termination) != null)
2013                         done.countDown();
2014                     if ((ctr = container) != null)
2015                         ctr.close();
2016                 }
2017             }
2018         }
2019         return e;
2020     }
2021 
2022     /**
2023      * Cancels tasks and interrupts workers
2024      */
2025     private void helpTerminate(boolean now) {
2026         Thread current = Thread.currentThread();
2027         int r = (int)current.threadId();   // stagger traversals
2028         WorkQueue[] qs = queues;
2029         int n = (qs == null) ? 0 : qs.length;
2030         for (int l = n; l > 0; --l, ++r) {
2031             WorkQueue q; ForkJoinTask<?> t; Thread o;
2032             int j = r & SMASK & (n - 1);
2033             if ((q = qs[j]) != null && q.source != DEREGISTERED) {
2034                 while ((t = q.poll()) != null) {
2035                     try {
2036                         t.cancel(false);
2037                     } catch (Throwable ignore) {
2038                     }
2039                 }
2040                 if ((r & 1) != 0 && (o = q.owner) != null &&
2041                     o != current && q.source != DEREGISTERED &&
2042                     (now || !o.isInterrupted())) {
2043                     try {
2044                         o.interrupt();
2045                     } catch (Throwable ignore) {
2046                     }
2047                 }
2048             }
2049         }
2050     }
2051 
2052     /**
2053      * Returns termination signal, constructing if necessary
2054      */
2055     private CountDownLatch terminationSignal() {
2056         CountDownLatch signal, s, u;
2057         if ((signal = termination) == null)
2058             signal = ((u = cmpExTerminationSignal(
2059                            s = new CountDownLatch(1))) == null) ? s : u;
2060         return signal;
2061     }
2062 
2063     // Exported methods
2064 
2065     // Constructors
2066 
2067     /**
2068      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2069      * java.lang.Runtime#availableProcessors}, using defaults for all
2070      * other parameters (see {@link #ForkJoinPool(int,
2071      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2072      * int, int, int, Predicate, long, TimeUnit)}).
2073      *
2074      * @throws SecurityException if a security manager exists and
2075      *         the caller is not permitted to modify threads
2076      *         because it does not hold {@link
2077      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2078      */
2079     public ForkJoinPool() {
2080         this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2081              defaultForkJoinWorkerThreadFactory, null, false,
2082              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2083     }
2084 
2085     /**
2086      * Creates a {@code ForkJoinPool} with the indicated parallelism
2087      * level, using defaults for all other parameters (see {@link
2088      * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2089      * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
2090      * long, TimeUnit)}).
2091      *
2092      * @param parallelism the parallelism level
2093      * @throws IllegalArgumentException if parallelism less than or
2094      *         equal to zero, or greater than implementation limit
2095      * @throws SecurityException if a security manager exists and
2096      *         the caller is not permitted to modify threads
2097      *         because it does not hold {@link
2098      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2099      */
2100     public ForkJoinPool(int parallelism) {
2101         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2102              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2103     }
2104 
2105     /**
2106      * Creates a {@code ForkJoinPool} with the given parameters (using
2107      * defaults for others -- see {@link #ForkJoinPool(int,
2108      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2109      * int, int, int, Predicate, long, TimeUnit)}).
2110      *
2111      * @param parallelism the parallelism level. For default value,
2112      * use {@link java.lang.Runtime#availableProcessors}.
2113      * @param factory the factory for creating new threads. For default value,
2114      * use {@link #defaultForkJoinWorkerThreadFactory}.
2115      * @param handler the handler for internal worker threads that
2116      * terminate due to unrecoverable errors encountered while executing
2117      * tasks. For default value, use {@code null}.
2118      * @param asyncMode if true,
2119      * establishes local first-in-first-out scheduling mode for forked
2120      * tasks that are never joined. This mode may be more appropriate
2121      * than default locally stack-based mode in applications in which
2122      * worker threads only process event-style asynchronous tasks.
2123      * For default value, use {@code false}.
2124      * @throws IllegalArgumentException if parallelism less than or
2125      *         equal to zero, or greater than implementation limit
2126      * @throws NullPointerException if the factory is null
2127      * @throws SecurityException if a security manager exists and
2128      *         the caller is not permitted to modify threads
2129      *         because it does not hold {@link
2130      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2131      */
2132     public ForkJoinPool(int parallelism,
2133                         ForkJoinWorkerThreadFactory factory,
2134                         UncaughtExceptionHandler handler,
2135                         boolean asyncMode) {
2136         this(parallelism, factory, handler, asyncMode,
2137              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2138     }
2139 
2140     /**
2141      * Creates a {@code ForkJoinPool} with the given parameters.
2142      *
2143      * @param parallelism the parallelism level. For default value,
2144      * use {@link java.lang.Runtime#availableProcessors}.
2145      *
2146      * @param factory the factory for creating new threads. For
2147      * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2148      *
2149      * @param handler the handler for internal worker threads that
2150      * terminate due to unrecoverable errors encountered while
2151      * executing tasks. For default value, use {@code null}.
2152      *
2153      * @param asyncMode if true, establishes local first-in-first-out
2154      * scheduling mode for forked tasks that are never joined. This
2155      * mode may be more appropriate than default locally stack-based
2156      * mode in applications in which worker threads only process
2157      * event-style asynchronous tasks.  For default value, use {@code
2158      * false}.
2159      *
2160      * @param corePoolSize the number of threads to keep in the pool
2161      * (unless timed out after an elapsed keep-alive). Normally (and
2162      * by default) this is the same value as the parallelism level,
2163      * but may be set to a larger value to reduce dynamic overhead if
2164      * tasks regularly block. Using a smaller value (for example
2165      * {@code 0}) has the same effect as the default.
2166      *
2167      * @param maximumPoolSize the maximum number of threads allowed.
2168      * When the maximum is reached, attempts to replace blocked
2169      * threads fail.  (However, because creation and termination of
2170      * different threads may overlap, and may be managed by the given
2171      * thread factory, this value may be transiently exceeded.)  To
2172      * arrange the same value as is used by default for the common
2173      * pool, use {@code 256} plus the {@code parallelism} level. (By
2174      * default, the common pool allows a maximum of 256 spare
2175      * threads.)  Using a value (for example {@code
2176      * Integer.MAX_VALUE}) larger than the implementation's total
2177      * thread limit has the same effect as using this limit (which is
2178      * the default).
2179      *
2180      * @param minimumRunnable the minimum allowed number of core
2181      * threads not blocked by a join or {@link ManagedBlocker}.  To
2182      * ensure progress, when too few unblocked threads exist and
2183      * unexecuted tasks may exist, new threads are constructed, up to
2184      * the given maximumPoolSize.  For the default value, use {@code
2185      * 1}, that ensures liveness.  A larger value might improve
2186      * throughput in the presence of blocked activities, but might
2187      * not, due to increased overhead.  A value of zero may be
2188      * acceptable when submitted tasks cannot have dependencies
2189      * requiring additional threads.
2190      *
2191      * @param saturate if non-null, a predicate invoked upon attempts
2192      * to create more than the maximum total allowed threads.  By
2193      * default, when a thread is about to block on a join or {@link
2194      * ManagedBlocker}, but cannot be replaced because the
2195      * maximumPoolSize would be exceeded, a {@link
2196      * RejectedExecutionException} is thrown.  But if this predicate
2197      * returns {@code true}, then no exception is thrown, so the pool
2198      * continues to operate with fewer than the target number of
2199      * runnable threads, which might not ensure progress.
2200      *
2201      * @param keepAliveTime the elapsed time since last use before
2202      * a thread is terminated (and then later replaced if needed).
2203      * For the default value, use {@code 60, TimeUnit.SECONDS}.
2204      *
2205      * @param unit the time unit for the {@code keepAliveTime} argument
2206      *
2207      * @throws IllegalArgumentException if parallelism is less than or
2208      *         equal to zero, or is greater than implementation limit,
2209      *         or if maximumPoolSize is less than parallelism,
2210      *         of if the keepAliveTime is less than or equal to zero.
2211      * @throws NullPointerException if the factory is null
2212      * @throws SecurityException if a security manager exists and
2213      *         the caller is not permitted to modify threads
2214      *         because it does not hold {@link
2215      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2216      * @since 9
2217      */
2218     public ForkJoinPool(int parallelism,
2219                         ForkJoinWorkerThreadFactory factory,
2220                         UncaughtExceptionHandler handler,
2221                         boolean asyncMode,
2222                         int corePoolSize,
2223                         int maximumPoolSize,
2224                         int minimumRunnable,
2225                         Predicate<? super ForkJoinPool> saturate,
2226                         long keepAliveTime,
2227                         TimeUnit unit) {
2228         checkPermission();
2229         int p = parallelism;
2230         if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
2231             throw new IllegalArgumentException();
2232         if (factory == null || unit == null)
2233             throw new NullPointerException();
2234         int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2235         this.parallelism = p;
2236         this.factory = factory;
2237         this.ueh = handler;
2238         this.saturate = saturate;
2239         this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2240         int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP);
2241         int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP);
2242         this.config = (((asyncMode ? FIFO : 0) & LMASK) |
2243                        (((long)maxSpares) << TC_SHIFT) |
2244                        (((long)minAvail)  << RC_SHIFT));
2245         this.queues = new WorkQueue[size];
2246         String pid = Integer.toString(getAndAddPoolIds(1) + 1);
2247         String name = "ForkJoinPool-" + pid;
2248         this.workerNamePrefix = name + "-worker-";
2249         this.container = SharedThreadContainer.create(name);
2250     }
2251 
2252     /**
2253      * Constructor for common pool using parameters possibly
2254      * overridden by system properties
2255      */
2256     private ForkJoinPool(byte forCommonPoolOnly) {
2257         ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
2258         UncaughtExceptionHandler handler = null;
2259         int maxSpares = DEFAULT_COMMON_MAX_SPARES;
2260         int pc = 0, preset = 0; // nonzero if size set as property
2261         try {  // ignore exceptions in accessing/parsing properties
2262             String pp = System.getProperty
2263                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2264             if (pp != null) {
2265                 pc = Math.max(0, Integer.parseInt(pp));
2266                 preset = PRESET_SIZE;
2267             }
2268             String ms = System.getProperty
2269                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
2270             if (ms != null)
2271                 maxSpares = Math.clamp(Integer.parseInt(ms), 0, MAX_CAP);
2272             String sf = System.getProperty
2273                 ("java.util.concurrent.ForkJoinPool.common.threadFactory");
2274             String sh = System.getProperty
2275                 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2276             if (sf != null || sh != null) {
2277                 ClassLoader ldr = ClassLoader.getSystemClassLoader();
2278                 if (sf != null)
2279                     fac = (ForkJoinWorkerThreadFactory)
2280                         ldr.loadClass(sf).getConstructor().newInstance();
2281                 if (sh != null)
2282                     handler = (UncaughtExceptionHandler)
2283                         ldr.loadClass(sh).getConstructor().newInstance();
2284             }
2285         } catch (Exception ignore) {
2286         }
2287         if (preset == 0)
2288             pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
2289         int p = Math.min(pc, MAX_CAP);
2290         int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1));
2291         this.parallelism = p;
2292         this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) |
2293                        (1L << RC_SHIFT));
2294         this.factory = fac;
2295         this.ueh = handler;
2296         this.keepAlive = DEFAULT_KEEPALIVE;
2297         this.saturate = null;
2298         this.workerNamePrefix = null;
2299         this.queues = new WorkQueue[size];
2300         this.container = SharedThreadContainer.create("ForkJoinPool.commonPool");
2301     }
2302 
2303     /**
2304      * Returns the common pool instance. This pool is statically
2305      * constructed; its run state is unaffected by attempts to {@link
2306      * #shutdown} or {@link #shutdownNow}. However this pool and any
2307      * ongoing processing are automatically terminated upon program
2308      * {@link System#exit}.  Any program that relies on asynchronous
2309      * task processing to complete before program termination should
2310      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2311      * before exit.
2312      *
2313      * @return the common pool instance
2314      * @since 1.8
2315      */
2316     public static ForkJoinPool commonPool() {
2317         // assert common != null : "static init error";
2318         return common;
2319     }
2320 
2321     // Execution methods
2322 
2323     /**
2324      * Performs the given task, returning its result upon completion.
2325      * If the computation encounters an unchecked Exception or Error,
2326      * it is rethrown as the outcome of this invocation.  Rethrown
2327      * exceptions behave in the same way as regular exceptions, but,
2328      * when possible, contain stack traces (as displayed for example
2329      * using {@code ex.printStackTrace()}) of both the current thread
2330      * as well as the thread actually encountering the exception;
2331      * minimally only the latter.
2332      *
2333      * @param task the task
2334      * @param <T> the type of the task's result
2335      * @return the task's result
2336      * @throws NullPointerException if the task is null
2337      * @throws RejectedExecutionException if the task cannot be
2338      *         scheduled for execution
2339      */
2340     public <T> T invoke(ForkJoinTask<T> task) {
2341         Objects.requireNonNull(task);
2342         poolSubmit(true, task);
2343         try {
2344             return task.join();
2345         } catch (RuntimeException | Error unchecked) {
2346             throw unchecked;
2347         } catch (Exception checked) {
2348             throw new RuntimeException(checked);
2349         }
2350     }
2351 
2352     /**
2353      * Arranges for (asynchronous) execution of the given task.
2354      *
2355      * @param task the task
2356      * @throws NullPointerException if the task is null
2357      * @throws RejectedExecutionException if the task cannot be
2358      *         scheduled for execution
2359      */
2360     public void execute(ForkJoinTask<?> task) {
2361         Objects.requireNonNull(task);
2362         poolSubmit(true, task);
2363     }
2364 
2365     // AbstractExecutorService methods
2366 
2367     /**
2368      * @throws NullPointerException if the task is null
2369      * @throws RejectedExecutionException if the task cannot be
2370      *         scheduled for execution
2371      */
2372     @Override
2373     @SuppressWarnings("unchecked")
2374     public void execute(Runnable task) {
2375         poolSubmit(true, (task instanceof ForkJoinTask<?>)
2376                    ? (ForkJoinTask<Void>) task // avoid re-wrap
2377                    : new ForkJoinTask.RunnableExecuteAction(task));
2378     }
2379 
2380     /**
2381      * Submits a ForkJoinTask for execution.
2382      *
2383      * @implSpec
2384      * This method is equivalent to {@link #externalSubmit(ForkJoinTask)}
2385      * when called from a thread that is not in this pool.
2386      *
2387      * @param task the task to submit
2388      * @param <T> the type of the task's result
2389      * @return the task
2390      * @throws NullPointerException if the task is null
2391      * @throws RejectedExecutionException if the task cannot be
2392      *         scheduled for execution
2393      */
2394     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2395         Objects.requireNonNull(task);
2396         poolSubmit(true, task);
2397         return task;
2398     }
2399 
2400     /**
2401      * @throws NullPointerException if the task is null
2402      * @throws RejectedExecutionException if the task cannot be
2403      *         scheduled for execution
2404      */
2405     @Override
2406     public <T> ForkJoinTask<T> submit(Callable<T> task) {
2407         ForkJoinTask<T> t =
2408             (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
2409             new ForkJoinTask.AdaptedCallable<T>(task) :
2410             new ForkJoinTask.AdaptedInterruptibleCallable<T>(task);
2411         poolSubmit(true, t);
2412         return t;
2413     }
2414 
2415     /**
2416      * @throws NullPointerException if the task is null
2417      * @throws RejectedExecutionException if the task cannot be
2418      *         scheduled for execution
2419      */
2420     @Override
2421     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2422         ForkJoinTask<T> t =
2423             (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
2424             new ForkJoinTask.AdaptedRunnable<T>(task, result) :
2425             new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result);
2426         poolSubmit(true, t);
2427         return t;
2428     }
2429 
2430     /**
2431      * @throws NullPointerException if the task is null
2432      * @throws RejectedExecutionException if the task cannot be
2433      *         scheduled for execution
2434      */
2435     @Override
2436     @SuppressWarnings("unchecked")
2437     public ForkJoinTask<?> submit(Runnable task) {
2438         ForkJoinTask<?> f = (task instanceof ForkJoinTask<?>) ?
2439             (ForkJoinTask<Void>) task : // avoid re-wrap
2440             ((Thread.currentThread() instanceof ForkJoinWorkerThread) ?
2441              new ForkJoinTask.AdaptedRunnable<Void>(task, null) :
2442              new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null));
2443         poolSubmit(true, f);
2444         return f;
2445     }
2446 
2447     /**
2448      * Submits the given task as if submitted from a non-{@code ForkJoinTask}
2449      * client. The task is added to a scheduling queue for submissions to the
2450      * pool even when called from a thread in the pool.
2451      *
2452      * @implSpec
2453      * This method is equivalent to {@link #submit(ForkJoinTask)} when called
2454      * from a thread that is not in this pool.
2455      *
2456      * @return the task
2457      * @param task the task to submit
2458      * @param <T> the type of the task's result
2459      * @throws NullPointerException if the task is null
2460      * @throws RejectedExecutionException if the task cannot be
2461      *         scheduled for execution
2462      * @since 20
2463      */
2464     public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2465         Objects.requireNonNull(task);
2466         externalSubmissionQueue().push(task, this, false);
2467         return task;
2468     }
2469 
2470     /**
2471      * Submits the given task without guaranteeing that it will
2472      * eventually execute in the absence of available active threads.
2473      * In some contexts, this method may reduce contention and
2474      * overhead by relying on context-specific knowledge that existing
2475      * threads (possibly including the calling thread if operating in
2476      * this pool) will eventually be available to execute the task.
2477      *
2478      * @param task the task
2479      * @param <T> the type of the task's result
2480      * @return the task
2481      * @throws NullPointerException if the task is null
2482      * @throws RejectedExecutionException if the task cannot be
2483      *         scheduled for execution
2484      * @since 19
2485      */
2486     public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
2487         Objects.requireNonNull(task);
2488         poolSubmit(false, task);
2489         return task;
2490     }
2491 
2492     /**
2493      * Changes the target parallelism of this pool, controlling the
2494      * future creation, use, and termination of worker threads.
2495      * Applications include contexts in which the number of available
2496      * processors changes over time.
2497      *
2498      * @implNote This implementation restricts the maximum number of
2499      * running threads to 32767
2500      *
2501      * @param size the target parallelism level
2502      * @return the previous parallelism level.
2503      * @throws IllegalArgumentException if size is less than 1 or
2504      *         greater than the maximum supported by this pool.
2505      * @throws UnsupportedOperationException this is the{@link
2506      *         #commonPool()} and parallelism level was set by System
2507      *         property {@systemProperty
2508      *         java.util.concurrent.ForkJoinPool.common.parallelism}.
2509      * @throws SecurityException if a security manager exists and
2510      *         the caller is not permitted to modify threads
2511      *         because it does not hold {@link
2512      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2513      * @since 19
2514      */
2515     public int setParallelism(int size) {
2516         if (size < 1 || size > MAX_CAP)
2517             throw new IllegalArgumentException();
2518         if ((config & PRESET_SIZE) != 0)
2519             throw new UnsupportedOperationException("Cannot override System property");
2520         checkPermission();
2521         return getAndSetParallelism(size);
2522     }
2523 
2524     /**
2525      * Uninterrupible version of {@code invokeAll}. Executes the given
2526      * tasks, returning a list of Futures holding their status and
2527      * results when all complete, ignoring interrupts.  {@link
2528      * Future#isDone} is {@code true} for each element of the returned
2529      * list.  Note that a <em>completed</em> task could have
2530      * terminated either normally or by throwing an exception.  The
2531      * results of this method are undefined if the given collection is
2532      * modified while this operation is in progress.
2533      *
2534      * @apiNote This method supports usages that previously relied on an
2535      * incompatible override of
2536      * {@link ExecutorService#invokeAll(java.util.Collection)}.
2537      *
2538      * @param tasks the collection of tasks
2539      * @param <T> the type of the values returned from the tasks
2540      * @return a list of Futures representing the tasks, in the same
2541      *         sequential order as produced by the iterator for the
2542      *         given task list, each of which has completed
2543      * @throws NullPointerException if tasks or any of its elements are {@code null}
2544      * @throws RejectedExecutionException if any task cannot be
2545      *         scheduled for execution
2546      * @since 22
2547      */
2548     public <T> List<Future<T>> invokeAllUninterruptibly(Collection<? extends Callable<T>> tasks) {
2549         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2550         try {
2551             for (Callable<T> t : tasks) {
2552                 ForkJoinTask<T> f = ForkJoinTask.adapt(t);
2553                 futures.add(f);
2554                 poolSubmit(true, f);
2555             }
2556             for (int i = futures.size() - 1; i >= 0; --i)
2557                 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2558             return futures;
2559         } catch (Throwable t) {
2560             for (Future<T> e : futures)
2561                 e.cancel(true);
2562             throw t;
2563         }
2564     }
2565 
2566     /**
2567      * Common support for timed and untimed invokeAll
2568      */
2569     private  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2570                                            long deadline)
2571         throws InterruptedException {
2572         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2573         try {
2574             for (Callable<T> t : tasks) {
2575                 ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t);
2576                 futures.add(f);
2577                 poolSubmit(true, f);
2578             }
2579             for (int i = futures.size() - 1; i >= 0; --i)
2580                 ((ForkJoinTask<?>)futures.get(i))
2581                     .quietlyJoinPoolInvokeAllTask(deadline);
2582             return futures;
2583         } catch (Throwable t) {
2584             for (Future<T> e : futures)
2585                 e.cancel(true);
2586             throw t;
2587         }
2588     }
2589 
2590     @Override
2591     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2592         throws InterruptedException {
2593         return invokeAll(tasks, 0L);
2594     }
2595     // for jdk version < 22, replace with
2596     // /**
2597     //  * @throws NullPointerException       {@inheritDoc}
2598     //  * @throws RejectedExecutionException {@inheritDoc}
2599     //  */
2600     // @Override
2601     // public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2602     //     return invokeAllUninterruptibly(tasks);
2603     // }
2604 
2605     @Override
2606     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2607                                          long timeout, TimeUnit unit)
2608         throws InterruptedException {
2609         return invokeAll(tasks, (System.nanoTime() + unit.toNanos(timeout)) | 1L);
2610     }
2611 
2612     @Override
2613     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2614         throws InterruptedException, ExecutionException {
2615         try {
2616             return new ForkJoinTask.InvokeAnyRoot<T>()
2617                 .invokeAny(tasks, this, false, 0L);
2618         } catch (TimeoutException cannotHappen) {
2619             assert false;
2620             return null;
2621         }
2622     }
2623 
2624     @Override
2625     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
2626                            long timeout, TimeUnit unit)
2627         throws InterruptedException, ExecutionException, TimeoutException {
2628         return new ForkJoinTask.InvokeAnyRoot<T>()
2629             .invokeAny(tasks, this, true, unit.toNanos(timeout));
2630     }
2631 
2632     /**
2633      * Returns the factory used for constructing new workers.
2634      *
2635      * @return the factory used for constructing new workers
2636      */
2637     public ForkJoinWorkerThreadFactory getFactory() {
2638         return factory;
2639     }
2640 
2641     /**
2642      * Returns the handler for internal worker threads that terminate
2643      * due to unrecoverable errors encountered while executing tasks.
2644      *
2645      * @return the handler, or {@code null} if none
2646      */
2647     public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2648         return ueh;
2649     }
2650 
2651     /**
2652      * Returns the targeted parallelism level of this pool.
2653      *
2654      * @return the targeted parallelism level of this pool
2655      */
2656     public int getParallelism() {
2657         return Math.max(getParallelismOpaque(), 1);
2658     }
2659 
2660     /**
2661      * Returns the targeted parallelism level of the common pool.
2662      *
2663      * @return the targeted parallelism level of the common pool
2664      * @since 1.8
2665      */
2666     public static int getCommonPoolParallelism() {
2667         return common.getParallelism();
2668     }
2669 
2670     /**
2671      * Returns the number of worker threads that have started but not
2672      * yet terminated.  The result returned by this method may differ
2673      * from {@link #getParallelism} when threads are created to
2674      * maintain parallelism when others are cooperatively blocked.
2675      *
2676      * @return the number of worker threads
2677      */
2678     public int getPoolSize() {
2679         return (short)(ctl >>> TC_SHIFT);
2680     }
2681 
2682     /**
2683      * Returns {@code true} if this pool uses local first-in-first-out
2684      * scheduling mode for forked tasks that are never joined.
2685      *
2686      * @return {@code true} if this pool uses async mode
2687      */
2688     public boolean getAsyncMode() {
2689         return (config & FIFO) != 0;
2690     }
2691 
2692     /**
2693      * Returns an estimate of the number of worker threads that are
2694      * not blocked waiting to join tasks or for other managed
2695      * synchronization. This method may overestimate the
2696      * number of running threads.
2697      *
2698      * @return the number of worker threads
2699      */
2700     public int getRunningThreadCount() {
2701         WorkQueue[] qs; WorkQueue q;
2702         int rc = 0;
2703         if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
2704             for (int i = 1; i < qs.length; i += 2) {
2705                 if ((q = qs[i]) != null && q.isApparentlyUnblocked())
2706                     ++rc;
2707             }
2708         }
2709         return rc;
2710     }
2711 
2712     /**
2713      * Returns an estimate of the number of threads that are currently
2714      * stealing or executing tasks. This method may overestimate the
2715      * number of active threads.
2716      *
2717      * @return the number of active threads
2718      */
2719     public int getActiveThreadCount() {
2720         return Math.max((short)(ctl >>> RC_SHIFT), 0);
2721     }
2722 
2723     /**
2724      * Returns {@code true} if all worker threads are currently idle.
2725      * An idle worker is one that cannot obtain a task to execute
2726      * because none are available to steal from other threads, and
2727      * there are no pending submissions to the pool. This method is
2728      * conservative; it might not return {@code true} immediately upon
2729      * idleness of all threads, but will eventually become true if
2730      * threads remain inactive.
2731      *
2732      * @return {@code true} if all threads are currently idle
2733      */
2734     public boolean isQuiescent() {
2735         return quiescent();
2736     }
2737 
2738     /**
2739      * Returns an estimate of the total number of completed tasks that
2740      * were executed by a thread other than their submitter. The
2741      * reported value underestimates the actual total number of steals
2742      * when the pool is not quiescent. This value may be useful for
2743      * monitoring and tuning fork/join programs: in general, steal
2744      * counts should be high enough to keep threads busy, but low
2745      * enough to avoid overhead and contention across threads.
2746      *
2747      * @return the number of steals
2748      */
2749     public long getStealCount() {
2750         long count = stealCount;
2751         WorkQueue[] qs; WorkQueue q;
2752         if ((qs = queues) != null) {
2753             for (int i = 1; i < qs.length; i += 2) {
2754                 if ((q = qs[i]) != null)
2755                      count += (long)q.nsteals & 0xffffffffL;
2756             }
2757         }
2758         return count;
2759     }
2760 
2761     /**
2762      * Returns an estimate of the total number of tasks currently held
2763      * in queues by worker threads (but not including tasks submitted
2764      * to the pool that have not begun executing). This value is only
2765      * an approximation, obtained by iterating across all threads in
2766      * the pool. This method may be useful for tuning task
2767      * granularities.
2768      *
2769      * @return the number of queued tasks
2770      * @see ForkJoinWorkerThread#getQueuedTaskCount()
2771      */
2772     public long getQueuedTaskCount() {
2773         WorkQueue[] qs; WorkQueue q;
2774         int count = 0;
2775         if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
2776             for (int i = 1; i < qs.length; i += 2) {
2777                 if ((q = qs[i]) != null)
2778                     count += q.queueSize();
2779             }
2780         }
2781         return count;
2782     }
2783 
2784     /**
2785      * Returns an estimate of the number of tasks submitted to this
2786      * pool that have not yet begun executing.  This method may take
2787      * time proportional to the number of submissions.
2788      *
2789      * @return the number of queued submissions
2790      */
2791     public int getQueuedSubmissionCount() {
2792         WorkQueue[] qs; WorkQueue q;
2793         int count = 0;
2794         if ((runState & TERMINATED) == 0 && (qs = queues) != null) {
2795             for (int i = 0; i < qs.length; i += 2) {
2796                 if ((q = qs[i]) != null)
2797                     count += q.queueSize();
2798             }
2799         }
2800         return count;
2801     }
2802 
2803     /**
2804      * Returns {@code true} if there are any tasks submitted to this
2805      * pool that have not yet begun executing.
2806      *
2807      * @return {@code true} if there are any queued submissions
2808      */
2809     public boolean hasQueuedSubmissions() {
2810         WorkQueue[] qs; WorkQueue q;
2811         if ((runState & STOP) == 0 && (qs = queues) != null) {
2812             for (int i = 0; i < qs.length; i += 2) {
2813                 if ((q = qs[i]) != null && q.queueSize() > 0)
2814                     return true;
2815             }
2816         }
2817         return false;
2818     }
2819 
2820     /**
2821      * Removes and returns the next unexecuted submission if one is
2822      * available.  This method may be useful in extensions to this
2823      * class that re-assign work in systems with multiple pools.
2824      *
2825      * @return the next submission, or {@code null} if none
2826      */
2827     protected ForkJoinTask<?> pollSubmission() {
2828         return pollScan(true);
2829     }
2830 
2831     /**
2832      * Removes all available unexecuted submitted and forked tasks
2833      * from scheduling queues and adds them to the given collection,
2834      * without altering their execution status. These may include
2835      * artificially generated or wrapped tasks. This method is
2836      * designed to be invoked only when the pool is known to be
2837      * quiescent. Invocations at other times may not remove all
2838      * tasks. A failure encountered while attempting to add elements
2839      * to collection {@code c} may result in elements being in
2840      * neither, either or both collections when the associated
2841      * exception is thrown.  The behavior of this operation is
2842      * undefined if the specified collection is modified while the
2843      * operation is in progress.
2844      *
2845      * @param c the collection to transfer elements into
2846      * @return the number of elements transferred
2847      */
2848     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2849         int count = 0;
2850         for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
2851             c.add(t);
2852             ++count;
2853         }
2854         return count;
2855     }
2856 
2857     /**
2858      * Returns a string identifying this pool, as well as its state,
2859      * including indications of run state, parallelism level, and
2860      * worker and task counts.
2861      *
2862      * @return a string identifying this pool, as well as its state
2863      */
2864     public String toString() {
2865         // Use a single pass through queues to collect counts
2866         int e = runState;
2867         long st = stealCount;
2868         long qt = 0L, ss = 0L; int rc = 0;
2869         WorkQueue[] qs; WorkQueue q;
2870         if ((qs = queues) != null) {
2871             for (int i = 0; i < qs.length; ++i) {
2872                 if ((q = qs[i]) != null) {
2873                     int size = q.queueSize();
2874                     if ((i & 1) == 0)
2875                         ss += size;
2876                     else {
2877                         qt += size;
2878                         st += (long)q.nsteals & 0xffffffffL;
2879                         if (q.isApparentlyUnblocked())
2880                             ++rc;
2881                     }
2882                 }
2883             }
2884         }
2885 
2886         int pc = parallelism;
2887         long c = ctl;
2888         int tc = (short)(c >>> TC_SHIFT);
2889         int ac = (short)(c >>> RC_SHIFT);
2890         if (ac < 0) // ignore transient negative
2891             ac = 0;
2892         String level = ((e & TERMINATED) != 0 ? "Terminated" :
2893                         (e & STOP)       != 0 ? "Terminating" :
2894                         (e & SHUTDOWN)   != 0 ? "Shutting down" :
2895                         "Running");
2896         return super.toString() +
2897             "[" + level +
2898             ", parallelism = " + pc +
2899             ", size = " + tc +
2900             ", active = " + ac +
2901             ", running = " + rc +
2902             ", steals = " + st +
2903             ", tasks = " + qt +
2904             ", submissions = " + ss +
2905             "]";
2906     }
2907 
2908     /**
2909      * Possibly initiates an orderly shutdown in which previously
2910      * submitted tasks are executed, but no new tasks will be
2911      * accepted. Invocation has no effect on execution state if this
2912      * is the {@link #commonPool()}, and no additional effect if
2913      * already shut down.  Tasks that are in the process of being
2914      * submitted concurrently during the course of this method may or
2915      * may not be rejected.
2916      *
2917      * @throws SecurityException if a security manager exists and
2918      *         the caller is not permitted to modify threads
2919      *         because it does not hold {@link
2920      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2921      */
2922     public void shutdown() {
2923         checkPermission();
2924         if (workerNamePrefix != null) // not common pool
2925             tryTerminate(false, true);
2926     }
2927 
2928     /**
2929      * Possibly attempts to cancel and/or stop all tasks, and reject
2930      * all subsequently submitted tasks.  Invocation has no effect on
2931      * execution state if this is the {@link #commonPool()}, and no
2932      * additional effect if already shut down. Otherwise, tasks that
2933      * are in the process of being submitted or executed concurrently
2934      * during the course of this method may or may not be
2935      * rejected. This method cancels both existing and unexecuted
2936      * tasks, in order to permit termination in the presence of task
2937      * dependencies. So the method always returns an empty list
2938      * (unlike the case for some other Executors).
2939      *
2940      * @return an empty list
2941      * @throws SecurityException if a security manager exists and
2942      *         the caller is not permitted to modify threads
2943      *         because it does not hold {@link
2944      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2945      */
2946     public List<Runnable> shutdownNow() {
2947         checkPermission();
2948         if (workerNamePrefix != null) // not common pool
2949             tryTerminate(true, true);
2950         return Collections.emptyList();
2951     }
2952 
2953     /**
2954      * Returns {@code true} if all tasks have completed following shut down.
2955      *
2956      * @return {@code true} if all tasks have completed following shut down
2957      */
2958     public boolean isTerminated() {
2959         return (tryTerminate(false, false) & TERMINATED) != 0;
2960     }
2961 
2962     /**
2963      * Returns {@code true} if the process of termination has
2964      * commenced but not yet completed.  This method may be useful for
2965      * debugging. A return of {@code true} reported a sufficient
2966      * period after shutdown may indicate that submitted tasks have
2967      * ignored or suppressed interruption, or are waiting for I/O,
2968      * causing this executor not to properly terminate. (See the
2969      * advisory notes for class {@link ForkJoinTask} stating that
2970      * tasks should not normally entail blocking operations.  But if
2971      * they do, they must abort them on interrupt.)
2972      *
2973      * @return {@code true} if terminating but not yet terminated
2974      */
2975     public boolean isTerminating() {
2976         return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP;
2977     }
2978 
2979     /**
2980      * Returns {@code true} if this pool has been shut down.
2981      *
2982      * @return {@code true} if this pool has been shut down
2983      */
2984     public boolean isShutdown() {
2985         return (runState & SHUTDOWN) != 0;
2986     }
2987 
2988     /**
2989      * Blocks until all tasks have completed execution after a
2990      * shutdown request, or the timeout occurs, or the current thread
2991      * is interrupted, whichever happens first. Because the {@link
2992      * #commonPool()} never terminates until program shutdown, when
2993      * applied to the common pool, this method is equivalent to {@link
2994      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2995      *
2996      * @param timeout the maximum time to wait
2997      * @param unit the time unit of the timeout argument
2998      * @return {@code true} if this executor terminated and
2999      *         {@code false} if the timeout elapsed before termination
3000      * @throws InterruptedException if interrupted while waiting
3001      */
3002     public boolean awaitTermination(long timeout, TimeUnit unit)
3003         throws InterruptedException {
3004         long nanos = unit.toNanos(timeout);
3005         CountDownLatch done;
3006         if (workerNamePrefix == null) {    // is common pool
3007             if (helpQuiescePool(this, nanos, true) < 0)
3008                 throw new InterruptedException();
3009             return false;
3010         }
3011         else if ((tryTerminate(false, false) & TERMINATED) != 0 ||
3012                  (done = terminationSignal()) == null ||
3013                  (runState & TERMINATED) != 0)
3014             return true;
3015         else
3016             return done.await(nanos, TimeUnit.NANOSECONDS);
3017     }
3018 
3019     /**
3020      * If called by a ForkJoinTask operating in this pool, equivalent
3021      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
3022      * waits and/or attempts to assist performing tasks until this
3023      * pool {@link #isQuiescent} or the indicated timeout elapses.
3024      *
3025      * @param timeout the maximum time to wait
3026      * @param unit the time unit of the timeout argument
3027      * @return {@code true} if quiescent; {@code false} if the
3028      * timeout elapsed.
3029      */
3030     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3031         return (helpQuiescePool(this, unit.toNanos(timeout), false) > 0);
3032     }
3033 
3034     /**
3035      * Unless this is the {@link #commonPool()}, initiates an orderly
3036      * shutdown in which previously submitted tasks are executed, but
3037      * no new tasks will be accepted, and waits until all tasks have
3038      * completed execution and the executor has terminated.
3039      *
3040      * <p> If already terminated, or this is the {@link
3041      * #commonPool()}, this method has no effect on execution, and
3042      * does not wait. Otherwise, if interrupted while waiting, this
3043      * method stops all executing tasks as if by invoking {@link
3044      * #shutdownNow()}. It then continues to wait until all actively
3045      * executing tasks have completed. Tasks that were awaiting
3046      * execution are not executed. The interrupt status will be
3047      * re-asserted before this method returns.
3048      *
3049      * @throws SecurityException if a security manager exists and
3050      *         shutting down this ExecutorService may manipulate
3051      *         threads that the caller is not permitted to modify
3052      *         because it does not hold {@link
3053      *         java.lang.RuntimePermission}{@code ("modifyThread")},
3054      *         or the security manager's {@code checkAccess} method
3055      *         denies access.
3056      * @since 19
3057      */
3058     @Override
3059     public void close() {
3060         if (workerNamePrefix != null) {
3061             checkPermission();
3062             CountDownLatch done = null;
3063             boolean interrupted = false;
3064             while ((tryTerminate(interrupted, true) & TERMINATED) == 0) {
3065                 if (done == null)
3066                     done = terminationSignal();
3067                 else {
3068                     try {
3069                         done.await();
3070                         break;
3071                     } catch (InterruptedException ex) {
3072                         interrupted = true;
3073                     }
3074                 }
3075             }
3076             if (interrupted)
3077                 Thread.currentThread().interrupt();
3078         }
3079     }
3080 
3081     /**
3082      * Interface for extending managed parallelism for tasks running
3083      * in {@link ForkJoinPool}s.
3084      *
3085      * <p>A {@code ManagedBlocker} provides two methods.  Method
3086      * {@link #isReleasable} must return {@code true} if blocking is
3087      * not necessary. Method {@link #block} blocks the current thread
3088      * if necessary (perhaps internally invoking {@code isReleasable}
3089      * before actually blocking). These actions are performed by any
3090      * thread invoking {@link
3091      * ForkJoinPool#managedBlock(ManagedBlocker)}.  The unusual
3092      * methods in this API accommodate synchronizers that may, but
3093      * don't usually, block for long periods. Similarly, they allow
3094      * more efficient internal handling of cases in which additional
3095      * workers may be, but usually are not, needed to ensure
3096      * sufficient parallelism.  Toward this end, implementations of
3097      * method {@code isReleasable} must be amenable to repeated
3098      * invocation. Neither method is invoked after a prior invocation
3099      * of {@code isReleasable} or {@code block} returns {@code true}.
3100      *
3101      * <p>For example, here is a ManagedBlocker based on a
3102      * ReentrantLock:
3103      * <pre> {@code
3104      * class ManagedLocker implements ManagedBlocker {
3105      *   final ReentrantLock lock;
3106      *   boolean hasLock = false;
3107      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3108      *   public boolean block() {
3109      *     if (!hasLock)
3110      *       lock.lock();
3111      *     return true;
3112      *   }
3113      *   public boolean isReleasable() {
3114      *     return hasLock || (hasLock = lock.tryLock());
3115      *   }
3116      * }}</pre>
3117      *
3118      * <p>Here is a class that possibly blocks waiting for an
3119      * item on a given queue:
3120      * <pre> {@code
3121      * class QueueTaker<E> implements ManagedBlocker {
3122      *   final BlockingQueue<E> queue;
3123      *   volatile E item = null;
3124      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3125      *   public boolean block() throws InterruptedException {
3126      *     if (item == null)
3127      *       item = queue.take();
3128      *     return true;
3129      *   }
3130      *   public boolean isReleasable() {
3131      *     return item != null || (item = queue.poll()) != null;
3132      *   }
3133      *   public E getItem() { // call after pool.managedBlock completes
3134      *     return item;
3135      *   }
3136      * }}</pre>
3137      */
3138     public static interface ManagedBlocker {
3139         /**
3140          * Possibly blocks the current thread, for example waiting for
3141          * a lock or condition.
3142          *
3143          * @return {@code true} if no additional blocking is necessary
3144          * (i.e., if isReleasable would return true)
3145          * @throws InterruptedException if interrupted while waiting
3146          * (the method is not required to do so, but is allowed to)
3147          */
3148         boolean block() throws InterruptedException;
3149 
3150         /**
3151          * Returns {@code true} if blocking is unnecessary.
3152          * @return {@code true} if blocking is unnecessary
3153          */
3154         boolean isReleasable();
3155     }
3156 
3157     /**
3158      * Runs the given possibly blocking task.  When {@linkplain
3159      * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3160      * method possibly arranges for a spare thread to be activated if
3161      * necessary to ensure sufficient parallelism while the current
3162      * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3163      *
3164      * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3165      * {@code blocker.block()} until either method returns {@code true}.
3166      * Every call to {@code blocker.block()} is preceded by a call to
3167      * {@code blocker.isReleasable()} that returned {@code false}.
3168      *
3169      * <p>If not running in a ForkJoinPool, this method is
3170      * behaviorally equivalent to
3171      * <pre> {@code
3172      * while (!blocker.isReleasable())
3173      *   if (blocker.block())
3174      *     break;}</pre>
3175      *
3176      * If running in a ForkJoinPool, the pool may first be expanded to
3177      * ensure sufficient parallelism available during the call to
3178      * {@code blocker.block()}.
3179      *
3180      * @param blocker the blocker task
3181      * @throws InterruptedException if {@code blocker.block()} did so
3182      */
3183     public static void managedBlock(ManagedBlocker blocker)
3184         throws InterruptedException {
3185         Thread t; ForkJoinPool p;
3186         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3187             (p = ((ForkJoinWorkerThread)t).pool) != null)
3188             p.compensatedBlock(blocker);
3189         else
3190             unmanagedBlock(blocker);
3191     }
3192 
3193     /** ManagedBlock for ForkJoinWorkerThreads */
3194     private void compensatedBlock(ManagedBlocker blocker)
3195         throws InterruptedException {
3196         Objects.requireNonNull(blocker);
3197         for (;;) {
3198             int comp; boolean done;
3199             long c = ctl;
3200             if (blocker.isReleasable())
3201                 break;
3202             if ((runState & STOP) != 0)
3203                 throw new InterruptedException();
3204             if ((comp = tryCompensate(c)) >= 0) {
3205                 try {
3206                     done = blocker.block();
3207                 } finally {
3208                     if (comp > 0)
3209                         getAndAddCtl(RC_UNIT);
3210                 }
3211                 if (done)
3212                     break;
3213             }
3214         }
3215     }
3216 
3217     /**
3218      * Invokes tryCompensate to create or re-activate a spare thread to
3219      * compensate for a thread that performs a blocking operation. When the
3220      * blocking operation is done then endCompensatedBlock must be invoked
3221      * with the value returned by this method to re-adjust the parallelism.
3222      */
3223     final long beginCompensatedBlock() {
3224         int c;
3225         do {} while ((c = tryCompensate(ctl)) < 0);
3226         return (c == 0) ? 0L : RC_UNIT;
3227     }
3228 
3229     /**
3230      * Re-adjusts parallelism after a blocking operation completes.
3231      */
3232     void endCompensatedBlock(long post) {
3233         if (post > 0L) {
3234             getAndAddCtl(post);
3235         }
3236     }
3237 
3238     /** ManagedBlock for external threads */
3239     private static void unmanagedBlock(ManagedBlocker blocker)
3240         throws InterruptedException {
3241         Objects.requireNonNull(blocker);
3242         do {} while (!blocker.isReleasable() && !blocker.block());
3243     }
3244 
3245     @Override
3246     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3247         return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
3248             new ForkJoinTask.AdaptedRunnable<T>(runnable, value) :
3249             new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value);
3250     }
3251 
3252     @Override
3253     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3254         return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
3255             new ForkJoinTask.AdaptedCallable<T>(callable) :
3256             new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable);
3257     }
3258 
3259     static {
3260         U = Unsafe.getUnsafe();
3261         Class<ForkJoinPool> klass = ForkJoinPool.class;
3262         try {
3263             Field poolIdsField = klass.getDeclaredField("poolIds");
3264             POOLIDS_BASE = U.staticFieldBase(poolIdsField);
3265             POOLIDS = U.staticFieldOffset(poolIdsField);
3266         } catch (NoSuchFieldException e) {
3267             throw new ExceptionInInitializerError(e);
3268         }
3269         CTL = U.objectFieldOffset(klass, "ctl");
3270         RUNSTATE = U.objectFieldOffset(klass, "runState");
3271         PARALLELISM =  U.objectFieldOffset(klass, "parallelism");
3272         THREADIDS = U.objectFieldOffset(klass, "threadIds");
3273         TERMINATION = U.objectFieldOffset(klass, "termination");
3274         Class<ForkJoinTask[]> aklass = ForkJoinTask[].class;
3275         ABASE = U.arrayBaseOffset(aklass);
3276         int scale = U.arrayIndexScale(aklass);
3277         ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
3278         if ((scale & (scale - 1)) != 0)
3279             throw new Error("array index scale not a power of two");
3280 
3281         defaultForkJoinWorkerThreadFactory =
3282             new DefaultForkJoinWorkerThreadFactory();
3283         @SuppressWarnings("removal")
3284         ForkJoinPool p = common = (System.getSecurityManager() == null) ?
3285             new ForkJoinPool((byte)0) :
3286             AccessController.doPrivileged(new PrivilegedAction<>() {
3287                     public ForkJoinPool run() {
3288                         return new ForkJoinPool((byte)0); }});
3289         // allow access to non-public methods
3290         SharedSecrets.setJavaUtilConcurrentFJPAccess(
3291             new JavaUtilConcurrentFJPAccess() {
3292                 @Override
3293                 public long beginCompensatedBlock(ForkJoinPool pool) {
3294                     return pool.beginCompensatedBlock();
3295                 }
3296                 public void endCompensatedBlock(ForkJoinPool pool, long post) {
3297                     pool.endCompensatedBlock(post);
3298                 }
3299             });
3300         Class<?> dep = LockSupport.class; // ensure loaded
3301     }
3302 }