< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.security.AccessController;
  42 import java.security.AccessControlContext;
  43 import java.security.Permission;
  44 import java.security.Permissions;
  45 import java.security.PrivilegedAction;
  46 import java.security.ProtectionDomain;
  47 import java.util.ArrayList;
  48 import java.util.Collection;
  49 import java.util.Collections;
  50 import java.util.List;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.atomic.AtomicInteger;
  53 import java.util.concurrent.locks.LockSupport;
  54 import java.util.concurrent.locks.ReentrantLock;
  55 import java.util.concurrent.locks.Condition;

  56 
  57 /**
  58  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  59  * A {@code ForkJoinPool} provides the entry point for submissions
  60  * from non-{@code ForkJoinTask} clients, as well as management and
  61  * monitoring operations.
  62  *
  63  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  64  * ExecutorService} mainly by virtue of employing
  65  * <em>work-stealing</em>: all threads in the pool attempt to find and
  66  * execute tasks submitted to the pool and/or created by other active
  67  * tasks (eventually blocking waiting for work if none exist). This
  68  * enables efficient processing when most tasks spawn other subtasks
  69  * (as do most {@code ForkJoinTask}s), as well as when many small
  70  * tasks are submitted to the pool from external clients.  Especially
  71  * when setting <em>asyncMode</em> to true in constructors, {@code
  72  * ForkJoinPool}s may also be appropriate for use with event-style
  73  * tasks that are never joined. All worker threads are initialized
  74  * with {@link Thread#isDaemon} set {@code true}.
  75  *

 949          */
 950         final int queueSize() {
 951             VarHandle.acquireFence(); // ensure fresh reads by external callers
 952             int n = top - base;
 953             return (n < 0) ? 0 : n;   // ignore transient negative
 954         }
 955 
 956         /**
 957          * Provides a more conservative estimate of whether this queue
 958          * has any tasks than does queueSize.
 959          */
 960         final boolean isEmpty() {
 961             return !((source != 0 && owner == null) || top - base > 0);
 962         }
 963 
 964         /**
 965          * Pushes a task. Call only by owner in unshared queues.
 966          *
 967          * @param task the task. Caller must ensure non-null.
 968          * @param pool (no-op if null)

 969          * @throws RejectedExecutionException if array cannot be resized
 970          */
 971         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
 972             ForkJoinTask<?>[] a = array;
 973             int s = top++, d = s - base, cap, m; // skip insert if disabled
 974             if (a != null && pool != null && (cap = a.length) > 0) {
 975                 setSlotVolatile(a, (m = cap - 1) & s, task);
 976                 if (d == m)
 977                     growArray();
 978                 if (d == m || a[m & (s - 1)] == null)
 979                     pool.signalWork(); // signal if was empty or resized



 980             }
 981         }
 982 




 983         /**
 984          * Pushes task to a shared queue with lock already held, and unlocks.
 985          *
 986          * @return true if caller should signal work
 987          */
 988         final boolean lockedPush(ForkJoinTask<?> task) {
 989             ForkJoinTask<?>[] a = array;
 990             int s = top++, d = s - base, cap, m;
 991             if (a != null && (cap = a.length) > 0) {
 992                 a[(m = cap - 1) & s] = task;
 993                 if (d == m)
 994                     growArray();
 995                 source = 0; // unlock
 996                 if (d == m || a[m & (s - 1)] == null)
 997                     return true;
 998             }
 999             return false;
1000         }
1001 
1002         /**

1296                 throw new ExceptionInInitializerError(e);
1297             }
1298         }
1299     }
1300 
1301     // static fields (initialized in static initializer below)
1302 
1303     /**
1304      * Creates a new ForkJoinWorkerThread. This factory is used unless
1305      * overridden in ForkJoinPool constructors.
1306      */
1307     public static final ForkJoinWorkerThreadFactory
1308         defaultForkJoinWorkerThreadFactory;
1309 
1310     /**
1311      * Permission required for callers of methods that may start or
1312      * kill threads.
1313      */
1314     static final RuntimePermission modifyThreadPermission;
1315 
1316     /**
1317      * Common (static) pool. Non-null for public use unless a static
1318      * construction exception, but internal usages null-check on use
1319      * to paranoically avoid potential initialization circularities
1320      * as well as to simplify generated code.
1321      */
1322     static final ForkJoinPool common;
1323 
1324     /**
1325      * Common pool parallelism. To allow simpler use and management
1326      * when common pool threads are disabled, we allow the underlying
1327      * common.parallelism field to be zero, but in that case still report
1328      * parallelism as 1 to reflect resulting caller-runs mechanics.
1329      */
1330     static final int COMMON_PARALLELISM;
1331 
1332     /**
1333      * Limit on spare thread construction in tryCompensate.
1334      */
1335     private static final int COMMON_MAX_SPARES;
1336 
1337     /**
1338      * Sequence number for creating worker names
1339      */
1340     private static volatile int poolIds;
1341 
1342     // static configuration constants
1343 
1344     /**
1345      * Default idle timeout value (in milliseconds) for the thread
1346      * triggering quiescence to park waiting for new work
1347      */
1348     private static final long DEFAULT_KEEPALIVE = 60_000L;
1349 
1350     /**
1351      * Undershoot tolerance for idle timeouts
1352      */
1353     private static final long TIMEOUT_SLOP = 20L;
1354 
1355     /**
1356      * The default value for COMMON_MAX_SPARES.  Overridable using the
1357      * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1358      * property.  The default value is far in excess of normal
1359      * requirements, but also far short of MAX_CAP and typical OS
1360      * thread limits, so allows JVMs to catch misuse/abuse before
1361      * running out of resources needed to do so.
1362      */
1363     private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1364 
1365     /*
1366      * Bits and masks for field ctl, packed with 4 16 bit subfields:
1367      * RC: Number of released (unqueued) workers minus target parallelism
1368      * TC: Number of total workers minus target parallelism
1369      * SS: version count and status of top waiting thread
1370      * ID: poolIndex of top of Treiber stack of waiters
1371      *
1372      * When convenient, we can extract the lower 32 stack top bits
1373      * (including version bits) as sp=(int)ctl.  The offsets of counts
1374      * by the target parallelism and the positionings of fields makes
1375      * it possible to perform the most common checks via sign tests of
1376      * fields: When ac is negative, there are not enough unqueued
1377      * workers, when tc is negative, there are not enough total
1378      * workers.  When sp is non-zero, there are waiting workers.  To
1379      * deal with possibly negative fields, we use casts in and out of
1380      * "short" and/or signed shifts to maintain signedness.
1381      *
1382      * Because it occupies uppermost bits, we can add one release
1383      * count using getAndAdd of RC_UNIT, rather than CAS, when
1384      * returning from a blocked join.  Other updates entail multiple

1402     private static final int  TC_SHIFT   = 32;
1403     private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
1404     private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1405     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1406 
1407     // Instance fields
1408 
1409     final long keepAlive;                // milliseconds before dropping if idle
1410     volatile long stealCount;            // collects worker nsteals
1411     int scanRover;                       // advances across pollScan calls
1412     volatile int threadIds;              // for worker thread names
1413     final int bounds;                    // min, max threads packed as shorts
1414     volatile int mode;                   // parallelism, runstate, queue mode
1415     WorkQueue[] queues;                  // main registry
1416     final ReentrantLock registrationLock;
1417     Condition termination;               // lazily constructed
1418     final String workerNamePrefix;       // null for common pool
1419     final ForkJoinWorkerThreadFactory factory;
1420     final UncaughtExceptionHandler ueh;  // per-worker UEH
1421     final Predicate<? super ForkJoinPool> saturate;

1422 
1423     @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1424     volatile long ctl;                   // main pool control
1425 
1426     // Support for atomic operations
1427     private static final VarHandle CTL;
1428     private static final VarHandle MODE;
1429     private static final VarHandle THREADIDS;
1430     private static final VarHandle POOLIDS;
1431     private boolean compareAndSetCtl(long c, long v) {
1432         return CTL.compareAndSet(this, c, v);
1433     }
1434     private long compareAndExchangeCtl(long c, long v) {
1435         return (long)CTL.compareAndExchange(this, c, v);
1436     }
1437     private long getAndAddCtl(long v) {
1438         return (long)CTL.getAndAdd(this, v);
1439     }
1440     private int getAndBitwiseOrMode(int v) {
1441         return (int)MODE.getAndBitwiseOr(this, v);

1445     }
1446     private static int getAndAddPoolIds(int x) {
1447         return (int)POOLIDS.getAndAdd(x);
1448     }
1449 
1450     // Creating, registering and deregistering workers
1451 
1452     /**
1453      * Tries to construct and start one worker. Assumes that total
1454      * count has already been incremented as a reservation.  Invokes
1455      * deregisterWorker on any failure.
1456      *
1457      * @return true if successful
1458      */
1459     private boolean createWorker() {
1460         ForkJoinWorkerThreadFactory fac = factory;
1461         Throwable ex = null;
1462         ForkJoinWorkerThread wt = null;
1463         try {
1464             if (fac != null && (wt = fac.newThread(this)) != null) {
1465                 wt.start();
1466                 return true;
1467             }
1468         } catch (Throwable rex) {
1469             ex = rex;
1470         }
1471         deregisterWorker(wt, ex);
1472         return false;
1473     }
1474 
1475     /**
1476      * Provides a name for ForkJoinWorkerThread constructor.
1477      */
1478     final String nextWorkerThreadName() {
1479         String prefix = workerNamePrefix;
1480         int tid = getAndAddThreadIds(1) + 1;
1481         if (prefix == null) // commonPool has no prefix
1482             prefix = "ForkJoinPool.commonPool-worker-";
1483         return prefix.concat(Integer.toString(tid));
1484     }
1485 

1663     }
1664 
1665     /**
1666      * Advances worker phase, pushes onto ctl stack, and awaits signal
1667      * or reports termination.
1668      *
1669      * @return negative if terminated, else 0
1670      */
1671     private int awaitWork(WorkQueue w) {
1672         if (w == null)
1673             return -1;                       // already terminated
1674         int phase = (w.phase + SS_SEQ) & ~UNSIGNALLED;
1675         w.phase = phase | UNSIGNALLED;       // advance phase
1676         long prevCtl = ctl, c;               // enqueue
1677         do {
1678             w.stackPred = (int)prevCtl;
1679             c = ((prevCtl - RC_UNIT) & UC_MASK) | (phase & SP_MASK);
1680         } while (prevCtl != (prevCtl = compareAndExchangeCtl(prevCtl, c)));
1681 
1682         Thread.interrupted();                // clear status
1683         LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK)
1684         long deadline = 0L;                  // nonzero if possibly quiescent
1685         int ac = (int)(c >> RC_SHIFT), md;
1686         if ((md = mode) < 0)                 // pool is terminating
1687             return -1;
1688         else if ((md & SMASK) + ac <= 0) {
1689             boolean checkTermination = (md & SHUTDOWN) != 0;
1690             if ((deadline = System.currentTimeMillis() + keepAlive) == 0L)
1691                 deadline = 1L;               // avoid zero
1692             WorkQueue[] qs = queues;         // check for racing submission
1693             int n = (qs == null) ? 0 : qs.length;
1694             for (int i = 0; i < n; i += 2) {
1695                 WorkQueue q; ForkJoinTask<?>[] a; int cap, b;
1696                 if (ctl != c) {              // already signalled
1697                     checkTermination = false;
1698                     break;
1699                 }
1700                 else if ((q = qs[i]) != null &&
1701                          (a = q.array) != null && (cap = a.length) > 0 &&
1702                          ((b = q.base) != q.top || a[(cap - 1) & b] != null ||
1703                           q.source != 0)) {

2180         else if (q.lockedPush(task))
2181             signalWork();
2182     }
2183 
2184     /**
2185      * Pushes a possibly-external submission.
2186      */
2187     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2188         Thread t; ForkJoinWorkerThread wt; WorkQueue q;
2189         if (task == null)
2190             throw new NullPointerException();
2191         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2192             (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null &&
2193             wt.pool == this)
2194             q.push(task, this);
2195         else
2196             externalPush(task);
2197         return task;
2198     }
2199 














2200     /**
2201      * Returns common pool queue for an external thread that has
2202      * possibly ever submitted a common pool task (nonzero probe), or
2203      * null if none.
2204      */
2205     static WorkQueue commonQueue() {
2206         ForkJoinPool p; WorkQueue[] qs;
2207         int r = ThreadLocalRandom.getProbe(), n;
2208         return ((p = common) != null && (qs = p.queues) != null &&
2209                 (n = qs.length) > 0 && r != 0) ?
2210             qs[(n - 1) & (r << 1)] : null;
2211     }
2212 
2213     /**
2214      * Returns queue for an external thread, if one exists
2215      */
2216     final WorkQueue externalQueue() {
2217         WorkQueue[] qs;
2218         int r = ThreadLocalRandom.getProbe(), n;
2219         return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?
2220             qs[(n - 1) & (r << 1)] : null;
2221     }
2222 
2223     /**
2224      * If the given executor is a ForkJoinPool, poll and execute
2225      * AsynchronousCompletionTasks from worker's queue until none are
2226      * available or blocker is released.
2227      */
2228     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {

2329                 for (int j = 1; j < n; j += 2) { // unblock other workers
2330                     if ((q = qs[j]) != null && (thread = q.owner) != null &&
2331                         !thread.isInterrupted()) {
2332                         changed = true;
2333                         try {
2334                             thread.interrupt();
2335                         } catch (Throwable ignore) {
2336                         }
2337                     }
2338                 }
2339             }
2340             ReentrantLock lock; Condition cond; // signal when no workers
2341             if (((md = mode) & TERMINATED) == 0 &&
2342                 (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2343                 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2344                 (lock = registrationLock) != null) {
2345                 lock.lock();
2346                 if ((cond = termination) != null)
2347                     cond.signalAll();
2348                 lock.unlock();

2349             }
2350             if (changed)
2351                 rescan = true;
2352             else if (rescan)
2353                 rescan = false;
2354             else
2355                 break;
2356         }
2357         return true;
2358     }
2359 
2360     // Exported methods
2361 
2362     // Constructors
2363 
2364     /**
2365      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2366      * java.lang.Runtime#availableProcessors}, using defaults for all
2367      * other parameters (see {@link #ForkJoinPool(int,
2368      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,

2527         if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
2528             throw new IllegalArgumentException();
2529         if (factory == null || unit == null)
2530             throw new NullPointerException();
2531         this.factory = factory;
2532         this.ueh = handler;
2533         this.saturate = saturate;
2534         this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2535         int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2536         int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP);
2537         int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - p;
2538         int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2539         this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH);
2540         this.mode = p | (asyncMode ? FIFO : 0);
2541         this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2542                     (((long)(-p)     << RC_SHIFT) & RC_MASK));
2543         this.registrationLock = new ReentrantLock();
2544         this.queues = new WorkQueue[size];
2545         String pid = Integer.toString(getAndAddPoolIds(1) + 1);
2546         this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";



2547     }
2548 
2549     // helper method for commonPool constructor
2550     private static Object newInstanceFromSystemProperty(String property)
2551         throws ReflectiveOperationException {
2552         String className = System.getProperty(property);
2553         return (className == null)
2554             ? null
2555             : ClassLoader.getSystemClassLoader().loadClass(className)
2556             .getConstructor().newInstance();
2557     }
2558 
2559     /**
2560      * Constructor for common pool using parameters possibly
2561      * overridden by system properties
2562      */
2563     private ForkJoinPool(byte forCommonPoolOnly) {
2564         int parallelism = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
2565         ForkJoinWorkerThreadFactory fac = null;
2566         UncaughtExceptionHandler handler = null;
2567         try {  // ignore exceptions in accessing/parsing properties
2568             fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2569                 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2570             handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2571                 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2572             String pp = System.getProperty
2573                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2574             if (pp != null)
2575                 parallelism = Integer.parseInt(pp);
2576         } catch (Exception ignore) {
2577         }
2578         this.ueh = handler;
2579         this.keepAlive = DEFAULT_KEEPALIVE;
2580         this.saturate = null;
2581         this.workerNamePrefix = null;
2582         int p = Math.min(Math.max(parallelism, 0), MAX_CAP), size;
2583         this.mode = p;
2584         if (p > 0) {
2585             size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2586             this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2587             this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2588                         (((long)(-p) << RC_SHIFT) & RC_MASK));
2589         } else {  // zero min, max, spare counts, 1 slot
2590             size = 1;
2591             this.bounds = 0;
2592             this.ctl = 0L;
2593         }
2594         this.factory = (fac != null) ? fac :
2595             new DefaultCommonPoolForkJoinWorkerThreadFactory();
2596         this.queues = new WorkQueue[size];
2597         this.registrationLock = new ReentrantLock();



2598     }
2599 
2600     /**
2601      * Returns the common pool instance. This pool is statically
2602      * constructed; its run state is unaffected by attempts to {@link
2603      * #shutdown} or {@link #shutdownNow}. However this pool and any
2604      * ongoing processing are automatically terminated upon program
2605      * {@link System#exit}.  Any program that relies on asynchronous
2606      * task processing to complete before program termination should
2607      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2608      * before exit.
2609      *
2610      * @return the common pool instance
2611      * @since 1.8
2612      */
2613     public static ForkJoinPool commonPool() {
2614         // assert common != null : "static init error";
2615         return common;
2616     }
2617 
2618     // Execution methods
2619 
2620     /**
2621      * Performs the given task, returning its result upon completion.
2622      * If the computation encounters an unchecked Exception or Error,
2623      * it is rethrown as the outcome of this invocation.  Rethrown
2624      * exceptions behave in the same way as regular exceptions, but,
2625      * when possible, contain stack traces (as displayed for example
2626      * using {@code ex.printStackTrace()}) of both the current thread
2627      * as well as the thread actually encountering the exception;
2628      * minimally only the latter.
2629      *
2630      * @param task the task
2631      * @param <T> the type of the task's result
2632      * @return the task's result
2633      * @throws NullPointerException if the task is null
2634      * @throws RejectedExecutionException if the task cannot be
2635      *         scheduled for execution

2915         return ueh;
2916     }
2917 
2918     /**
2919      * Returns the targeted parallelism level of this pool.
2920      *
2921      * @return the targeted parallelism level of this pool
2922      */
2923     public int getParallelism() {
2924         int par = mode & SMASK;
2925         return (par > 0) ? par : 1;
2926     }
2927 
2928     /**
2929      * Returns the targeted parallelism level of the common pool.
2930      *
2931      * @return the targeted parallelism level of the common pool
2932      * @since 1.8
2933      */
2934     public static int getCommonPoolParallelism() {
2935         return COMMON_PARALLELISM;
2936     }
2937 
2938     /**
2939      * Returns the number of worker threads that have started but not
2940      * yet terminated.  The result returned by this method may differ
2941      * from {@link #getParallelism} when threads are created to
2942      * maintain parallelism when others are cooperatively blocked.
2943      *
2944      * @return the number of worker threads
2945      */
2946     public int getPoolSize() {
2947         return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2948     }
2949 
2950     /**
2951      * Returns {@code true} if this pool uses local first-in-first-out
2952      * scheduling mode for forked tasks that are never joined.
2953      *
2954      * @return {@code true} if this pool uses async mode
2955      */

3176             ", submissions = " + ss +
3177             "]";
3178     }
3179 
3180     /**
3181      * Possibly initiates an orderly shutdown in which previously
3182      * submitted tasks are executed, but no new tasks will be
3183      * accepted. Invocation has no effect on execution state if this
3184      * is the {@link #commonPool()}, and no additional effect if
3185      * already shut down.  Tasks that are in the process of being
3186      * submitted concurrently during the course of this method may or
3187      * may not be rejected.
3188      *
3189      * @throws SecurityException if a security manager exists and
3190      *         the caller is not permitted to modify threads
3191      *         because it does not hold {@link
3192      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3193      */
3194     public void shutdown() {
3195         checkPermission();
3196         if (this != common)
3197             tryTerminate(false, true);
3198     }
3199 
3200     /**
3201      * Possibly attempts to cancel and/or stop all tasks, and reject
3202      * all subsequently submitted tasks.  Invocation has no effect on
3203      * execution state if this is the {@link #commonPool()}, and no
3204      * additional effect if already shut down. Otherwise, tasks that
3205      * are in the process of being submitted or executed concurrently
3206      * during the course of this method may or may not be
3207      * rejected. This method cancels both existing and unexecuted
3208      * tasks, in order to permit termination in the presence of task
3209      * dependencies. So the method always returns an empty list
3210      * (unlike the case for some other Executors).
3211      *
3212      * @return an empty list
3213      * @throws SecurityException if a security manager exists and
3214      *         the caller is not permitted to modify threads
3215      *         because it does not hold {@link
3216      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3217      */
3218     public List<Runnable> shutdownNow() {
3219         checkPermission();
3220         if (this != common)
3221             tryTerminate(true, true);
3222         return Collections.emptyList();
3223     }
3224 
3225     /**
3226      * Returns {@code true} if all tasks have completed following shut down.
3227      *
3228      * @return {@code true} if all tasks have completed following shut down
3229      */
3230     public boolean isTerminated() {
3231         return (mode & TERMINATED) != 0;
3232     }
3233 
3234     /**
3235      * Returns {@code true} if the process of termination has
3236      * commenced but not yet completed.  This method may be useful for
3237      * debugging. A return of {@code true} reported a sufficient
3238      * period after shutdown may indicate that submitted tasks have
3239      * ignored or suppressed interruption, or are waiting for I/O,
3240      * causing this executor not to properly terminate. (See the

3259 
3260     /**
3261      * Blocks until all tasks have completed execution after a
3262      * shutdown request, or the timeout occurs, or the current thread
3263      * is interrupted, whichever happens first. Because the {@link
3264      * #commonPool()} never terminates until program shutdown, when
3265      * applied to the common pool, this method is equivalent to {@link
3266      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3267      *
3268      * @param timeout the maximum time to wait
3269      * @param unit the time unit of the timeout argument
3270      * @return {@code true} if this executor terminated and
3271      *         {@code false} if the timeout elapsed before termination
3272      * @throws InterruptedException if interrupted while waiting
3273      */
3274     public boolean awaitTermination(long timeout, TimeUnit unit)
3275         throws InterruptedException {
3276         ReentrantLock lock; Condition cond;
3277         long nanos = unit.toNanos(timeout);
3278         boolean terminated = false;
3279         if (this == common) {
3280             Thread t; ForkJoinWorkerThread wt; int q;
3281             if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3282                 (wt = (ForkJoinWorkerThread)t).pool == this)
3283                 q = helpQuiescePool(wt.workQueue, nanos, true);
3284             else
3285                 q = externalHelpQuiescePool(nanos, true);
3286             if (q < 0)
3287                 throw new InterruptedException();
3288         }
3289         else if (!(terminated = ((mode & TERMINATED) != 0)) &&
3290                  (lock = registrationLock) != null) {
3291             lock.lock();
3292             try {
3293                 if ((cond = termination) == null)
3294                     termination = cond = lock.newCondition();
3295                 while (!(terminated = ((mode & TERMINATED) != 0)) && nanos > 0L)
3296                     nanos = cond.awaitNanos(nanos);
3297             } finally {
3298                 lock.unlock();
3299             }

3476     @Override
3477     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3478         return new ForkJoinTask.AdaptedCallable<T>(callable);
3479     }
3480 
3481     static {
3482         try {
3483             MethodHandles.Lookup l = MethodHandles.lookup();
3484             CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3485             MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3486             THREADIDS = l.findVarHandle(ForkJoinPool.class, "threadIds", int.class);
3487             POOLIDS = l.findStaticVarHandle(ForkJoinPool.class, "poolIds", int.class);
3488         } catch (ReflectiveOperationException e) {
3489             throw new ExceptionInInitializerError(e);
3490         }
3491 
3492         // Reduce the risk of rare disastrous classloading in first call to
3493         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3494         Class<?> ensureLoaded = LockSupport.class;
3495 
3496         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3497         try {
3498             String p = System.getProperty
3499                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3500             if (p != null)
3501                 commonMaxSpares = Integer.parseInt(p);
3502         } catch (Exception ignore) {}
3503         COMMON_MAX_SPARES = commonMaxSpares;
3504 
3505         defaultForkJoinWorkerThreadFactory =
3506             new DefaultForkJoinWorkerThreadFactory();
3507         modifyThreadPermission = new RuntimePermission("modifyThread");
3508         @SuppressWarnings("removal")
3509         ForkJoinPool tmp = AccessController.doPrivileged(new PrivilegedAction<>() {
3510             public ForkJoinPool run() {
3511                 return new ForkJoinPool((byte)0); }});
3512         common = tmp;













3513 
3514         COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
































3515     }
3516 }

  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.security.AccessController;
  42 import java.security.AccessControlContext;
  43 import java.security.Permission;
  44 import java.security.Permissions;
  45 import java.security.PrivilegedAction;
  46 import java.security.ProtectionDomain;
  47 import java.util.ArrayList;
  48 import java.util.Collection;
  49 import java.util.Collections;
  50 import java.util.List;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.atomic.AtomicInteger;
  53 import java.util.concurrent.locks.LockSupport;
  54 import java.util.concurrent.locks.ReentrantLock;
  55 import java.util.concurrent.locks.Condition;
  56 import jdk.internal.vm.SharedThreadContainer;
  57 
  58 /**
  59  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  60  * A {@code ForkJoinPool} provides the entry point for submissions
  61  * from non-{@code ForkJoinTask} clients, as well as management and
  62  * monitoring operations.
  63  *
  64  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  65  * ExecutorService} mainly by virtue of employing
  66  * <em>work-stealing</em>: all threads in the pool attempt to find and
  67  * execute tasks submitted to the pool and/or created by other active
  68  * tasks (eventually blocking waiting for work if none exist). This
  69  * enables efficient processing when most tasks spawn other subtasks
  70  * (as do most {@code ForkJoinTask}s), as well as when many small
  71  * tasks are submitted to the pool from external clients.  Especially
  72  * when setting <em>asyncMode</em> to true in constructors, {@code
  73  * ForkJoinPool}s may also be appropriate for use with event-style
  74  * tasks that are never joined. All worker threads are initialized
  75  * with {@link Thread#isDaemon} set {@code true}.
  76  *

 950          */
 951         final int queueSize() {
 952             VarHandle.acquireFence(); // ensure fresh reads by external callers
 953             int n = top - base;
 954             return (n < 0) ? 0 : n;   // ignore transient negative
 955         }
 956 
 957         /**
 958          * Provides a more conservative estimate of whether this queue
 959          * has any tasks than does queueSize.
 960          */
 961         final boolean isEmpty() {
 962             return !((source != 0 && owner == null) || top - base > 0);
 963         }
 964 
 965         /**
 966          * Pushes a task. Call only by owner in unshared queues.
 967          *
 968          * @param task the task. Caller must ensure non-null.
 969          * @param pool (no-op if null)
 970          * @param signalOnEmpty signal a worker if queue was empty
 971          * @throws RejectedExecutionException if array cannot be resized
 972          */
 973         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean signalOnEmpty) {
 974             ForkJoinTask<?>[] a = array;
 975             int s = top++, d = s - base, cap, m; // skip insert if disabled
 976             if (a != null && pool != null && (cap = a.length) > 0) {
 977                 setSlotVolatile(a, (m = cap - 1) & s, task);
 978                 if (d == m) {
 979                     growArray();
 980                     pool.signalWork();  // signal if resized
 981                 } else {
 982                     if (signalOnEmpty && a[m & (s - 1)] == null)
 983                         pool.signalWork(); // signal if was empty
 984                 }
 985             }
 986         }
 987 
 988         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
 989             push(task, pool, true);
 990         }
 991 
 992         /**
 993          * Pushes task to a shared queue with lock already held, and unlocks.
 994          *
 995          * @return true if caller should signal work
 996          */
 997         final boolean lockedPush(ForkJoinTask<?> task) {
 998             ForkJoinTask<?>[] a = array;
 999             int s = top++, d = s - base, cap, m;
1000             if (a != null && (cap = a.length) > 0) {
1001                 a[(m = cap - 1) & s] = task;
1002                 if (d == m)
1003                     growArray();
1004                 source = 0; // unlock
1005                 if (d == m || a[m & (s - 1)] == null)
1006                     return true;
1007             }
1008             return false;
1009         }
1010 
1011         /**

1305                 throw new ExceptionInInitializerError(e);
1306             }
1307         }
1308     }
1309 
1310     // static fields (initialized in static initializer below)
1311 
1312     /**
1313      * Creates a new ForkJoinWorkerThread. This factory is used unless
1314      * overridden in ForkJoinPool constructors.
1315      */
1316     public static final ForkJoinWorkerThreadFactory
1317         defaultForkJoinWorkerThreadFactory;
1318 
1319     /**
1320      * Permission required for callers of methods that may start or
1321      * kill threads.
1322      */
1323     static final RuntimePermission modifyThreadPermission;
1324 





















1325     /**
1326      * Sequence number for creating worker names
1327      */
1328     private static volatile int poolIds;
1329 
1330     // static configuration constants
1331 
1332     /**
1333      * Default idle timeout value (in milliseconds) for the thread
1334      * triggering quiescence to park waiting for new work
1335      */
1336     private static final long DEFAULT_KEEPALIVE = 60_000L;
1337 
1338     /**
1339      * Undershoot tolerance for idle timeouts
1340      */
1341     private static final long TIMEOUT_SLOP = 20L;
1342 










1343     /*
1344      * Bits and masks for field ctl, packed with 4 16 bit subfields:
1345      * RC: Number of released (unqueued) workers minus target parallelism
1346      * TC: Number of total workers minus target parallelism
1347      * SS: version count and status of top waiting thread
1348      * ID: poolIndex of top of Treiber stack of waiters
1349      *
1350      * When convenient, we can extract the lower 32 stack top bits
1351      * (including version bits) as sp=(int)ctl.  The offsets of counts
1352      * by the target parallelism and the positionings of fields makes
1353      * it possible to perform the most common checks via sign tests of
1354      * fields: When ac is negative, there are not enough unqueued
1355      * workers, when tc is negative, there are not enough total
1356      * workers.  When sp is non-zero, there are waiting workers.  To
1357      * deal with possibly negative fields, we use casts in and out of
1358      * "short" and/or signed shifts to maintain signedness.
1359      *
1360      * Because it occupies uppermost bits, we can add one release
1361      * count using getAndAdd of RC_UNIT, rather than CAS, when
1362      * returning from a blocked join.  Other updates entail multiple

1380     private static final int  TC_SHIFT   = 32;
1381     private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
1382     private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1383     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1384 
1385     // Instance fields
1386 
1387     final long keepAlive;                // milliseconds before dropping if idle
1388     volatile long stealCount;            // collects worker nsteals
1389     int scanRover;                       // advances across pollScan calls
1390     volatile int threadIds;              // for worker thread names
1391     final int bounds;                    // min, max threads packed as shorts
1392     volatile int mode;                   // parallelism, runstate, queue mode
1393     WorkQueue[] queues;                  // main registry
1394     final ReentrantLock registrationLock;
1395     Condition termination;               // lazily constructed
1396     final String workerNamePrefix;       // null for common pool
1397     final ForkJoinWorkerThreadFactory factory;
1398     final UncaughtExceptionHandler ueh;  // per-worker UEH
1399     final Predicate<? super ForkJoinPool> saturate;
1400     final SharedThreadContainer container;
1401 
1402     @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1403     volatile long ctl;                   // main pool control
1404 
1405     // Support for atomic operations
1406     private static final VarHandle CTL;
1407     private static final VarHandle MODE;
1408     private static final VarHandle THREADIDS;
1409     private static final VarHandle POOLIDS;
1410     private boolean compareAndSetCtl(long c, long v) {
1411         return CTL.compareAndSet(this, c, v);
1412     }
1413     private long compareAndExchangeCtl(long c, long v) {
1414         return (long)CTL.compareAndExchange(this, c, v);
1415     }
1416     private long getAndAddCtl(long v) {
1417         return (long)CTL.getAndAdd(this, v);
1418     }
1419     private int getAndBitwiseOrMode(int v) {
1420         return (int)MODE.getAndBitwiseOr(this, v);

1424     }
1425     private static int getAndAddPoolIds(int x) {
1426         return (int)POOLIDS.getAndAdd(x);
1427     }
1428 
1429     // Creating, registering and deregistering workers
1430 
1431     /**
1432      * Tries to construct and start one worker. Assumes that total
1433      * count has already been incremented as a reservation.  Invokes
1434      * deregisterWorker on any failure.
1435      *
1436      * @return true if successful
1437      */
1438     private boolean createWorker() {
1439         ForkJoinWorkerThreadFactory fac = factory;
1440         Throwable ex = null;
1441         ForkJoinWorkerThread wt = null;
1442         try {
1443             if (fac != null && (wt = fac.newThread(this)) != null) {
1444                 container.start(wt);
1445                 return true;
1446             }
1447         } catch (Throwable rex) {
1448             ex = rex;
1449         }
1450         deregisterWorker(wt, ex);
1451         return false;
1452     }
1453 
1454     /**
1455      * Provides a name for ForkJoinWorkerThread constructor.
1456      */
1457     final String nextWorkerThreadName() {
1458         String prefix = workerNamePrefix;
1459         int tid = getAndAddThreadIds(1) + 1;
1460         if (prefix == null) // commonPool has no prefix
1461             prefix = "ForkJoinPool.commonPool-worker-";
1462         return prefix.concat(Integer.toString(tid));
1463     }
1464 

1642     }
1643 
1644     /**
1645      * Advances worker phase, pushes onto ctl stack, and awaits signal
1646      * or reports termination.
1647      *
1648      * @return negative if terminated, else 0
1649      */
1650     private int awaitWork(WorkQueue w) {
1651         if (w == null)
1652             return -1;                       // already terminated
1653         int phase = (w.phase + SS_SEQ) & ~UNSIGNALLED;
1654         w.phase = phase | UNSIGNALLED;       // advance phase
1655         long prevCtl = ctl, c;               // enqueue
1656         do {
1657             w.stackPred = (int)prevCtl;
1658             c = ((prevCtl - RC_UNIT) & UC_MASK) | (phase & SP_MASK);
1659         } while (prevCtl != (prevCtl = compareAndExchangeCtl(prevCtl, c)));
1660 
1661         Thread.interrupted();                // clear status
1662         //LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK)
1663         long deadline = 0L;                  // nonzero if possibly quiescent
1664         int ac = (int)(c >> RC_SHIFT), md;
1665         if ((md = mode) < 0)                 // pool is terminating
1666             return -1;
1667         else if ((md & SMASK) + ac <= 0) {
1668             boolean checkTermination = (md & SHUTDOWN) != 0;
1669             if ((deadline = System.currentTimeMillis() + keepAlive) == 0L)
1670                 deadline = 1L;               // avoid zero
1671             WorkQueue[] qs = queues;         // check for racing submission
1672             int n = (qs == null) ? 0 : qs.length;
1673             for (int i = 0; i < n; i += 2) {
1674                 WorkQueue q; ForkJoinTask<?>[] a; int cap, b;
1675                 if (ctl != c) {              // already signalled
1676                     checkTermination = false;
1677                     break;
1678                 }
1679                 else if ((q = qs[i]) != null &&
1680                          (a = q.array) != null && (cap = a.length) > 0 &&
1681                          ((b = q.base) != q.top || a[(cap - 1) & b] != null ||
1682                           q.source != 0)) {

2159         else if (q.lockedPush(task))
2160             signalWork();
2161     }
2162 
2163     /**
2164      * Pushes a possibly-external submission.
2165      */
2166     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2167         Thread t; ForkJoinWorkerThread wt; WorkQueue q;
2168         if (task == null)
2169             throw new NullPointerException();
2170         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2171             (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null &&
2172             wt.pool == this)
2173             q.push(task, this);
2174         else
2175             externalPush(task);
2176         return task;
2177     }
2178 
2179     /**
2180      * Pushes a possibly-external submission without signalling when the queue
2181      * is empty. This method is invoked (reflectively) by the virtual thread
2182      * implementation.
2183      */
2184     private void externalLazySubmit(ForkJoinTask<?> task) {
2185         Thread t = Thread.currentThread();
2186         if (t instanceof ForkJoinWorkerThread wt && wt.pool == this) {
2187             wt.workQueue.push(task, this, false);
2188         } else {
2189             externalPush(task);
2190         }
2191     }
2192 
2193     /**
2194      * Returns common pool queue for an external thread that has
2195      * possibly ever submitted a common pool task (nonzero probe), or
2196      * null if none.
2197      */
2198     static WorkQueue commonQueue() {
2199         ForkJoinPool p; WorkQueue[] qs;
2200         int r = ThreadLocalRandom.getProbe(), n;
2201         return ((p = CommonPool.common) != null && (qs = p.queues) != null &&
2202                 (n = qs.length) > 0 && r != 0) ?
2203             qs[(n - 1) & (r << 1)] : null;
2204     }
2205 
2206     /**
2207      * Returns queue for an external thread, if one exists
2208      */
2209     final WorkQueue externalQueue() {
2210         WorkQueue[] qs;
2211         int r = ThreadLocalRandom.getProbe(), n;
2212         return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?
2213             qs[(n - 1) & (r << 1)] : null;
2214     }
2215 
2216     /**
2217      * If the given executor is a ForkJoinPool, poll and execute
2218      * AsynchronousCompletionTasks from worker's queue until none are
2219      * available or blocker is released.
2220      */
2221     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {

2322                 for (int j = 1; j < n; j += 2) { // unblock other workers
2323                     if ((q = qs[j]) != null && (thread = q.owner) != null &&
2324                         !thread.isInterrupted()) {
2325                         changed = true;
2326                         try {
2327                             thread.interrupt();
2328                         } catch (Throwable ignore) {
2329                         }
2330                     }
2331                 }
2332             }
2333             ReentrantLock lock; Condition cond; // signal when no workers
2334             if (((md = mode) & TERMINATED) == 0 &&
2335                 (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2336                 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2337                 (lock = registrationLock) != null) {
2338                 lock.lock();
2339                 if ((cond = termination) != null)
2340                     cond.signalAll();
2341                 lock.unlock();
2342                 container.close();
2343             }
2344             if (changed)
2345                 rescan = true;
2346             else if (rescan)
2347                 rescan = false;
2348             else
2349                 break;
2350         }
2351         return true;
2352     }
2353 
2354     // Exported methods
2355 
2356     // Constructors
2357 
2358     /**
2359      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2360      * java.lang.Runtime#availableProcessors}, using defaults for all
2361      * other parameters (see {@link #ForkJoinPool(int,
2362      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,

2521         if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
2522             throw new IllegalArgumentException();
2523         if (factory == null || unit == null)
2524             throw new NullPointerException();
2525         this.factory = factory;
2526         this.ueh = handler;
2527         this.saturate = saturate;
2528         this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2529         int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2530         int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP);
2531         int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - p;
2532         int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2533         this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH);
2534         this.mode = p | (asyncMode ? FIFO : 0);
2535         this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2536                     (((long)(-p)     << RC_SHIFT) & RC_MASK));
2537         this.registrationLock = new ReentrantLock();
2538         this.queues = new WorkQueue[size];
2539         String pid = Integer.toString(getAndAddPoolIds(1) + 1);
2540         this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";
2541 
2542         String name = "ForkJoinPool-" + pid;
2543         this.container = SharedThreadContainer.create(name);
2544     }
2545 
2546     // helper method for commonPool constructor
2547     private static Object newInstanceFromSystemProperty(String property)
2548         throws ReflectiveOperationException {
2549         String className = System.getProperty(property);
2550         return (className == null)
2551             ? null
2552             : ClassLoader.getSystemClassLoader().loadClass(className)
2553             .getConstructor().newInstance();
2554     }
2555 
2556     /**
2557      * Constructor for common pool using parameters possibly
2558      * overridden by system properties
2559      */
2560     private ForkJoinPool(byte forCommonPoolOnly) {
2561         int parallelism = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
2562         ForkJoinWorkerThreadFactory fac = null;
2563         UncaughtExceptionHandler handler = null;
2564         try {  // ignore exceptions in accessing/parsing properties
2565             fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2566                 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2567             handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2568                 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2569             String pp = System.getProperty
2570                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2571             if (pp != null)
2572                 parallelism = Integer.parseInt(pp);
2573         } catch (Exception ignore) {
2574         }
2575         this.ueh = handler;
2576         this.keepAlive = DEFAULT_KEEPALIVE;
2577         this.saturate = null;
2578         this.workerNamePrefix = null;
2579         int p = Math.min(Math.max(parallelism, 0), MAX_CAP), size;
2580         this.mode = p;
2581         if (p > 0) {
2582             size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2583             this.bounds = ((1 - p) & SMASK) | (CommonPool.COMMON_MAX_SPARES << SWIDTH);
2584             this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2585                         (((long)(-p) << RC_SHIFT) & RC_MASK));
2586         } else {  // zero min, max, spare counts, 1 slot
2587             size = 1;
2588             this.bounds = 0;
2589             this.ctl = 0L;
2590         }
2591         this.factory = (fac != null) ? fac :
2592             new DefaultCommonPoolForkJoinWorkerThreadFactory();
2593         this.queues = new WorkQueue[size];
2594         this.registrationLock = new ReentrantLock();
2595 
2596         String name = "ForkJoinPool.commonPool";
2597         this.container = SharedThreadContainer.create(name);
2598     }
2599 
2600     /**
2601      * Returns the common pool instance. This pool is statically
2602      * constructed; its run state is unaffected by attempts to {@link
2603      * #shutdown} or {@link #shutdownNow}. However this pool and any
2604      * ongoing processing are automatically terminated upon program
2605      * {@link System#exit}.  Any program that relies on asynchronous
2606      * task processing to complete before program termination should
2607      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2608      * before exit.
2609      *
2610      * @return the common pool instance
2611      * @since 1.8
2612      */
2613     public static ForkJoinPool commonPool() {
2614         // assert common != null : "static init error";
2615         return CommonPool.common;
2616     }
2617 
2618     // Execution methods
2619 
2620     /**
2621      * Performs the given task, returning its result upon completion.
2622      * If the computation encounters an unchecked Exception or Error,
2623      * it is rethrown as the outcome of this invocation.  Rethrown
2624      * exceptions behave in the same way as regular exceptions, but,
2625      * when possible, contain stack traces (as displayed for example
2626      * using {@code ex.printStackTrace()}) of both the current thread
2627      * as well as the thread actually encountering the exception;
2628      * minimally only the latter.
2629      *
2630      * @param task the task
2631      * @param <T> the type of the task's result
2632      * @return the task's result
2633      * @throws NullPointerException if the task is null
2634      * @throws RejectedExecutionException if the task cannot be
2635      *         scheduled for execution

2915         return ueh;
2916     }
2917 
2918     /**
2919      * Returns the targeted parallelism level of this pool.
2920      *
2921      * @return the targeted parallelism level of this pool
2922      */
2923     public int getParallelism() {
2924         int par = mode & SMASK;
2925         return (par > 0) ? par : 1;
2926     }
2927 
2928     /**
2929      * Returns the targeted parallelism level of the common pool.
2930      *
2931      * @return the targeted parallelism level of the common pool
2932      * @since 1.8
2933      */
2934     public static int getCommonPoolParallelism() {
2935         return CommonPool.COMMON_PARALLELISM;
2936     }
2937 
2938     /**
2939      * Returns the number of worker threads that have started but not
2940      * yet terminated.  The result returned by this method may differ
2941      * from {@link #getParallelism} when threads are created to
2942      * maintain parallelism when others are cooperatively blocked.
2943      *
2944      * @return the number of worker threads
2945      */
2946     public int getPoolSize() {
2947         return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2948     }
2949 
2950     /**
2951      * Returns {@code true} if this pool uses local first-in-first-out
2952      * scheduling mode for forked tasks that are never joined.
2953      *
2954      * @return {@code true} if this pool uses async mode
2955      */

3176             ", submissions = " + ss +
3177             "]";
3178     }
3179 
3180     /**
3181      * Possibly initiates an orderly shutdown in which previously
3182      * submitted tasks are executed, but no new tasks will be
3183      * accepted. Invocation has no effect on execution state if this
3184      * is the {@link #commonPool()}, and no additional effect if
3185      * already shut down.  Tasks that are in the process of being
3186      * submitted concurrently during the course of this method may or
3187      * may not be rejected.
3188      *
3189      * @throws SecurityException if a security manager exists and
3190      *         the caller is not permitted to modify threads
3191      *         because it does not hold {@link
3192      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3193      */
3194     public void shutdown() {
3195         checkPermission();
3196         if (this != CommonPool.common)
3197             tryTerminate(false, true);
3198     }
3199 
3200     /**
3201      * Possibly attempts to cancel and/or stop all tasks, and reject
3202      * all subsequently submitted tasks.  Invocation has no effect on
3203      * execution state if this is the {@link #commonPool()}, and no
3204      * additional effect if already shut down. Otherwise, tasks that
3205      * are in the process of being submitted or executed concurrently
3206      * during the course of this method may or may not be
3207      * rejected. This method cancels both existing and unexecuted
3208      * tasks, in order to permit termination in the presence of task
3209      * dependencies. So the method always returns an empty list
3210      * (unlike the case for some other Executors).
3211      *
3212      * @return an empty list
3213      * @throws SecurityException if a security manager exists and
3214      *         the caller is not permitted to modify threads
3215      *         because it does not hold {@link
3216      *         java.lang.RuntimePermission}{@code ("modifyThread")}
3217      */
3218     public List<Runnable> shutdownNow() {
3219         checkPermission();
3220         if (this != CommonPool.common)
3221             tryTerminate(true, true);
3222         return Collections.emptyList();
3223     }
3224 
3225     /**
3226      * Returns {@code true} if all tasks have completed following shut down.
3227      *
3228      * @return {@code true} if all tasks have completed following shut down
3229      */
3230     public boolean isTerminated() {
3231         return (mode & TERMINATED) != 0;
3232     }
3233 
3234     /**
3235      * Returns {@code true} if the process of termination has
3236      * commenced but not yet completed.  This method may be useful for
3237      * debugging. A return of {@code true} reported a sufficient
3238      * period after shutdown may indicate that submitted tasks have
3239      * ignored or suppressed interruption, or are waiting for I/O,
3240      * causing this executor not to properly terminate. (See the

3259 
3260     /**
3261      * Blocks until all tasks have completed execution after a
3262      * shutdown request, or the timeout occurs, or the current thread
3263      * is interrupted, whichever happens first. Because the {@link
3264      * #commonPool()} never terminates until program shutdown, when
3265      * applied to the common pool, this method is equivalent to {@link
3266      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
3267      *
3268      * @param timeout the maximum time to wait
3269      * @param unit the time unit of the timeout argument
3270      * @return {@code true} if this executor terminated and
3271      *         {@code false} if the timeout elapsed before termination
3272      * @throws InterruptedException if interrupted while waiting
3273      */
3274     public boolean awaitTermination(long timeout, TimeUnit unit)
3275         throws InterruptedException {
3276         ReentrantLock lock; Condition cond;
3277         long nanos = unit.toNanos(timeout);
3278         boolean terminated = false;
3279         if (this == CommonPool.common) {
3280             Thread t; ForkJoinWorkerThread wt; int q;
3281             if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3282                 (wt = (ForkJoinWorkerThread)t).pool == this)
3283                 q = helpQuiescePool(wt.workQueue, nanos, true);
3284             else
3285                 q = externalHelpQuiescePool(nanos, true);
3286             if (q < 0)
3287                 throw new InterruptedException();
3288         }
3289         else if (!(terminated = ((mode & TERMINATED) != 0)) &&
3290                  (lock = registrationLock) != null) {
3291             lock.lock();
3292             try {
3293                 if ((cond = termination) == null)
3294                     termination = cond = lock.newCondition();
3295                 while (!(terminated = ((mode & TERMINATED) != 0)) && nanos > 0L)
3296                     nanos = cond.awaitNanos(nanos);
3297             } finally {
3298                 lock.unlock();
3299             }

3476     @Override
3477     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3478         return new ForkJoinTask.AdaptedCallable<T>(callable);
3479     }
3480 
3481     static {
3482         try {
3483             MethodHandles.Lookup l = MethodHandles.lookup();
3484             CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3485             MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3486             THREADIDS = l.findVarHandle(ForkJoinPool.class, "threadIds", int.class);
3487             POOLIDS = l.findStaticVarHandle(ForkJoinPool.class, "poolIds", int.class);
3488         } catch (ReflectiveOperationException e) {
3489             throw new ExceptionInInitializerError(e);
3490         }
3491 
3492         // Reduce the risk of rare disastrous classloading in first call to
3493         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3494         Class<?> ensureLoaded = LockSupport.class;
3495 
3496         defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory();










3497         modifyThreadPermission = new RuntimePermission("modifyThread");
3498     }
3499 
3500     static class CommonPool {
3501         /**
3502          * Common (static) pool. Non-null for public use unless a static
3503          * construction exception, but internal usages null-check on use
3504          * to paranoically avoid potential initialization circularities
3505          * as well as to simplify generated code.
3506          */
3507         static final ForkJoinPool common;
3508 
3509         /**
3510          * Common pool parallelism. To allow simpler use and management
3511          * when common pool threads are disabled, we allow the underlying
3512          * common.parallelism field to be zero, but in that case still report
3513          * parallelism as 1 to reflect resulting caller-runs mechanics.
3514          */
3515         static final int COMMON_PARALLELISM;
3516 
3517         /**
3518          * Limit on spare thread construction in tryCompensate.
3519          */
3520         private static final int COMMON_MAX_SPARES;
3521 
3522         /**
3523          * The default value for COMMON_MAX_SPARES.  Overridable using the
3524          * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
3525          * property.  The default value is far in excess of normal
3526          * requirements, but also far short of MAX_CAP and typical OS
3527          * thread limits, so allows JVMs to catch misuse/abuse before
3528          * running out of resources needed to do so.
3529          */
3530         private static final int DEFAULT_COMMON_MAX_SPARES = 256;
3531 
3532         static {
3533             int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3534             try {
3535                 String p = System.getProperty
3536                     ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3537                 if (p != null)
3538                     commonMaxSpares = Integer.parseInt(p);
3539             } catch (Exception ignore) {}
3540             COMMON_MAX_SPARES = commonMaxSpares;
3541 
3542             @SuppressWarnings("removal")
3543             ForkJoinPool tmp = AccessController.doPrivileged(new PrivilegedAction<>() {
3544                 public ForkJoinPool run() {
3545                     return new ForkJoinPool((byte)0); }});
3546             common = tmp;
3547 
3548             COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
3549         }
3550     }
3551 }
< prev index next >