< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page

  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.security.AccessController;
  42 import java.security.AccessControlContext;
  43 import java.security.Permission;
  44 import java.security.Permissions;
  45 import java.security.PrivilegedAction;
  46 import java.security.ProtectionDomain;
  47 import java.util.ArrayList;
  48 import java.util.Collection;
  49 import java.util.Collections;
  50 import java.util.List;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.atomic.AtomicInteger;
  53 import java.util.concurrent.locks.LockSupport;
  54 import java.util.concurrent.locks.ReentrantLock;
  55 import java.util.concurrent.locks.Condition;


  56 
  57 /**
  58  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  59  * A {@code ForkJoinPool} provides the entry point for submissions
  60  * from non-{@code ForkJoinTask} clients, as well as management and
  61  * monitoring operations.
  62  *
  63  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  64  * ExecutorService} mainly by virtue of employing
  65  * <em>work-stealing</em>: all threads in the pool attempt to find and
  66  * execute tasks submitted to the pool and/or created by other active
  67  * tasks (eventually blocking waiting for work if none exist). This
  68  * enables efficient processing when most tasks spawn other subtasks
  69  * (as do most {@code ForkJoinTask}s), as well as when many small
  70  * tasks are submitted to the pool from external clients.  Especially
  71  * when setting <em>asyncMode</em> to true in constructors, {@code
  72  * ForkJoinPool}s may also be appropriate for use with event-style
  73  * tasks that are never joined. All worker threads are initialized
  74  * with {@link Thread#isDaemon} set {@code true}.
  75  *

 949          */
 950         final int queueSize() {
 951             VarHandle.acquireFence(); // ensure fresh reads by external callers
 952             int n = top - base;
 953             return (n < 0) ? 0 : n;   // ignore transient negative
 954         }
 955 
 956         /**
 957          * Provides a more conservative estimate of whether this queue
 958          * has any tasks than does queueSize.
 959          */
 960         final boolean isEmpty() {
 961             return !((source != 0 && owner == null) || top - base > 0);
 962         }
 963 
 964         /**
 965          * Pushes a task. Call only by owner in unshared queues.
 966          *
 967          * @param task the task. Caller must ensure non-null.
 968          * @param pool (no-op if null)

 969          * @throws RejectedExecutionException if array cannot be resized
 970          */
 971         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
 972             ForkJoinTask<?>[] a = array;
 973             int s = top++, d = s - base, cap, m; // skip insert if disabled
 974             if (a != null && pool != null && (cap = a.length) > 0) {
 975                 setSlotVolatile(a, (m = cap - 1) & s, task);
 976                 if (d == m)
 977                     growArray();
 978                 if (d == m || a[m & (s - 1)] == null)
 979                     pool.signalWork(); // signal if was empty or resized



 980             }
 981         }
 982 




 983         /**
 984          * Pushes task to a shared queue with lock already held, and unlocks.
 985          *
 986          * @return true if caller should signal work
 987          */
 988         final boolean lockedPush(ForkJoinTask<?> task) {
 989             ForkJoinTask<?>[] a = array;
 990             int s = top++, d = s - base, cap, m;
 991             if (a != null && (cap = a.length) > 0) {
 992                 a[(m = cap - 1) & s] = task;
 993                 if (d == m)
 994                     growArray();
 995                 source = 0; // unlock
 996                 if (d == m || a[m & (s - 1)] == null)
 997                     return true;
 998             }
 999             return false;
1000         }
1001 
1002         /**

1402     private static final int  TC_SHIFT   = 32;
1403     private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
1404     private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1405     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1406 
1407     // Instance fields
1408 
1409     final long keepAlive;                // milliseconds before dropping if idle
1410     volatile long stealCount;            // collects worker nsteals
1411     int scanRover;                       // advances across pollScan calls
1412     volatile int threadIds;              // for worker thread names
1413     final int bounds;                    // min, max threads packed as shorts
1414     volatile int mode;                   // parallelism, runstate, queue mode
1415     WorkQueue[] queues;                  // main registry
1416     final ReentrantLock registrationLock;
1417     Condition termination;               // lazily constructed
1418     final String workerNamePrefix;       // null for common pool
1419     final ForkJoinWorkerThreadFactory factory;
1420     final UncaughtExceptionHandler ueh;  // per-worker UEH
1421     final Predicate<? super ForkJoinPool> saturate;

1422 
1423     @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1424     volatile long ctl;                   // main pool control
1425 
1426     // Support for atomic operations
1427     private static final VarHandle CTL;
1428     private static final VarHandle MODE;
1429     private static final VarHandle THREADIDS;
1430     private static final VarHandle POOLIDS;
1431     private boolean compareAndSetCtl(long c, long v) {
1432         return CTL.compareAndSet(this, c, v);
1433     }
1434     private long compareAndExchangeCtl(long c, long v) {
1435         return (long)CTL.compareAndExchange(this, c, v);
1436     }
1437     private long getAndAddCtl(long v) {
1438         return (long)CTL.getAndAdd(this, v);
1439     }
1440     private int getAndBitwiseOrMode(int v) {
1441         return (int)MODE.getAndBitwiseOr(this, v);

1445     }
1446     private static int getAndAddPoolIds(int x) {
1447         return (int)POOLIDS.getAndAdd(x);
1448     }
1449 
1450     // Creating, registering and deregistering workers
1451 
1452     /**
1453      * Tries to construct and start one worker. Assumes that total
1454      * count has already been incremented as a reservation.  Invokes
1455      * deregisterWorker on any failure.
1456      *
1457      * @return true if successful
1458      */
1459     private boolean createWorker() {
1460         ForkJoinWorkerThreadFactory fac = factory;
1461         Throwable ex = null;
1462         ForkJoinWorkerThread wt = null;
1463         try {
1464             if (fac != null && (wt = fac.newThread(this)) != null) {
1465                 wt.start();
1466                 return true;
1467             }
1468         } catch (Throwable rex) {
1469             ex = rex;
1470         }
1471         deregisterWorker(wt, ex);
1472         return false;
1473     }
1474 
1475     /**
1476      * Provides a name for ForkJoinWorkerThread constructor.
1477      */
1478     final String nextWorkerThreadName() {
1479         String prefix = workerNamePrefix;
1480         int tid = getAndAddThreadIds(1) + 1;
1481         if (prefix == null) // commonPool has no prefix
1482             prefix = "ForkJoinPool.commonPool-worker-";
1483         return prefix.concat(Integer.toString(tid));
1484     }
1485 

2180         else if (q.lockedPush(task))
2181             signalWork();
2182     }
2183 
2184     /**
2185      * Pushes a possibly-external submission.
2186      */
2187     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2188         Thread t; ForkJoinWorkerThread wt; WorkQueue q;
2189         if (task == null)
2190             throw new NullPointerException();
2191         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2192             (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null &&
2193             wt.pool == this)
2194             q.push(task, this);
2195         else
2196             externalPush(task);
2197         return task;
2198     }
2199 

















2200     /**
2201      * Returns common pool queue for an external thread that has
2202      * possibly ever submitted a common pool task (nonzero probe), or
2203      * null if none.
2204      */
2205     static WorkQueue commonQueue() {
2206         ForkJoinPool p; WorkQueue[] qs;
2207         int r = ThreadLocalRandom.getProbe(), n;
2208         return ((p = common) != null && (qs = p.queues) != null &&
2209                 (n = qs.length) > 0 && r != 0) ?
2210             qs[(n - 1) & (r << 1)] : null;
2211     }
2212 
2213     /**
2214      * Returns queue for an external thread, if one exists
2215      */
2216     final WorkQueue externalQueue() {
2217         WorkQueue[] qs;
2218         int r = ThreadLocalRandom.getProbe(), n;
2219         return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?

2329                 for (int j = 1; j < n; j += 2) { // unblock other workers
2330                     if ((q = qs[j]) != null && (thread = q.owner) != null &&
2331                         !thread.isInterrupted()) {
2332                         changed = true;
2333                         try {
2334                             thread.interrupt();
2335                         } catch (Throwable ignore) {
2336                         }
2337                     }
2338                 }
2339             }
2340             ReentrantLock lock; Condition cond; // signal when no workers
2341             if (((md = mode) & TERMINATED) == 0 &&
2342                 (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2343                 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2344                 (lock = registrationLock) != null) {
2345                 lock.lock();
2346                 if ((cond = termination) != null)
2347                     cond.signalAll();
2348                 lock.unlock();

2349             }
2350             if (changed)
2351                 rescan = true;
2352             else if (rescan)
2353                 rescan = false;
2354             else
2355                 break;
2356         }
2357         return true;
2358     }
2359 
2360     // Exported methods
2361 
2362     // Constructors
2363 
2364     /**
2365      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2366      * java.lang.Runtime#availableProcessors}, using defaults for all
2367      * other parameters (see {@link #ForkJoinPool(int,
2368      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,

2527         if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
2528             throw new IllegalArgumentException();
2529         if (factory == null || unit == null)
2530             throw new NullPointerException();
2531         this.factory = factory;
2532         this.ueh = handler;
2533         this.saturate = saturate;
2534         this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2535         int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2536         int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP);
2537         int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - p;
2538         int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2539         this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH);
2540         this.mode = p | (asyncMode ? FIFO : 0);
2541         this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2542                     (((long)(-p)     << RC_SHIFT) & RC_MASK));
2543         this.registrationLock = new ReentrantLock();
2544         this.queues = new WorkQueue[size];
2545         String pid = Integer.toString(getAndAddPoolIds(1) + 1);
2546         this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";



2547     }
2548 
2549     // helper method for commonPool constructor
2550     private static Object newInstanceFromSystemProperty(String property)
2551         throws ReflectiveOperationException {
2552         String className = System.getProperty(property);
2553         return (className == null)
2554             ? null
2555             : ClassLoader.getSystemClassLoader().loadClass(className)
2556             .getConstructor().newInstance();
2557     }
2558 
2559     /**
2560      * Constructor for common pool using parameters possibly
2561      * overridden by system properties
2562      */
2563     private ForkJoinPool(byte forCommonPoolOnly) {
2564         int parallelism = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
2565         ForkJoinWorkerThreadFactory fac = null;
2566         UncaughtExceptionHandler handler = null;

2578         this.ueh = handler;
2579         this.keepAlive = DEFAULT_KEEPALIVE;
2580         this.saturate = null;
2581         this.workerNamePrefix = null;
2582         int p = Math.min(Math.max(parallelism, 0), MAX_CAP), size;
2583         this.mode = p;
2584         if (p > 0) {
2585             size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2586             this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2587             this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2588                         (((long)(-p) << RC_SHIFT) & RC_MASK));
2589         } else {  // zero min, max, spare counts, 1 slot
2590             size = 1;
2591             this.bounds = 0;
2592             this.ctl = 0L;
2593         }
2594         this.factory = (fac != null) ? fac :
2595             new DefaultCommonPoolForkJoinWorkerThreadFactory();
2596         this.queues = new WorkQueue[size];
2597         this.registrationLock = new ReentrantLock();



2598     }
2599 
2600     /**
2601      * Returns the common pool instance. This pool is statically
2602      * constructed; its run state is unaffected by attempts to {@link
2603      * #shutdown} or {@link #shutdownNow}. However this pool and any
2604      * ongoing processing are automatically terminated upon program
2605      * {@link System#exit}.  Any program that relies on asynchronous
2606      * task processing to complete before program termination should
2607      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2608      * before exit.
2609      *
2610      * @return the common pool instance
2611      * @since 1.8
2612      */
2613     public static ForkJoinPool commonPool() {
2614         // assert common != null : "static init error";
2615         return common;
2616     }
2617 

  36 package java.util.concurrent;
  37 
  38 import java.lang.Thread.UncaughtExceptionHandler;
  39 import java.lang.invoke.MethodHandles;
  40 import java.lang.invoke.VarHandle;
  41 import java.security.AccessController;
  42 import java.security.AccessControlContext;
  43 import java.security.Permission;
  44 import java.security.Permissions;
  45 import java.security.PrivilegedAction;
  46 import java.security.ProtectionDomain;
  47 import java.util.ArrayList;
  48 import java.util.Collection;
  49 import java.util.Collections;
  50 import java.util.List;
  51 import java.util.function.Predicate;
  52 import java.util.concurrent.atomic.AtomicInteger;
  53 import java.util.concurrent.locks.LockSupport;
  54 import java.util.concurrent.locks.ReentrantLock;
  55 import java.util.concurrent.locks.Condition;
  56 import jdk.internal.misc.VirtualThreads;
  57 import jdk.internal.vm.SharedThreadContainer;
  58 
  59 /**
  60  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
  61  * A {@code ForkJoinPool} provides the entry point for submissions
  62  * from non-{@code ForkJoinTask} clients, as well as management and
  63  * monitoring operations.
  64  *
  65  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
  66  * ExecutorService} mainly by virtue of employing
  67  * <em>work-stealing</em>: all threads in the pool attempt to find and
  68  * execute tasks submitted to the pool and/or created by other active
  69  * tasks (eventually blocking waiting for work if none exist). This
  70  * enables efficient processing when most tasks spawn other subtasks
  71  * (as do most {@code ForkJoinTask}s), as well as when many small
  72  * tasks are submitted to the pool from external clients.  Especially
  73  * when setting <em>asyncMode</em> to true in constructors, {@code
  74  * ForkJoinPool}s may also be appropriate for use with event-style
  75  * tasks that are never joined. All worker threads are initialized
  76  * with {@link Thread#isDaemon} set {@code true}.
  77  *

 951          */
 952         final int queueSize() {
 953             VarHandle.acquireFence(); // ensure fresh reads by external callers
 954             int n = top - base;
 955             return (n < 0) ? 0 : n;   // ignore transient negative
 956         }
 957 
 958         /**
 959          * Provides a more conservative estimate of whether this queue
 960          * has any tasks than does queueSize.
 961          */
 962         final boolean isEmpty() {
 963             return !((source != 0 && owner == null) || top - base > 0);
 964         }
 965 
 966         /**
 967          * Pushes a task. Call only by owner in unshared queues.
 968          *
 969          * @param task the task. Caller must ensure non-null.
 970          * @param pool (no-op if null)
 971          * @param signalOnEmpty signal a worker if queue was empty
 972          * @throws RejectedExecutionException if array cannot be resized
 973          */
 974         final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean signalOnEmpty) {
 975             ForkJoinTask<?>[] a = array;
 976             int s = top++, d = s - base, cap, m; // skip insert if disabled
 977             if (a != null && pool != null && (cap = a.length) > 0) {
 978                 setSlotVolatile(a, (m = cap - 1) & s, task);
 979                 if (d == m) {
 980                     growArray();
 981                     pool.signalWork();  // signal if resized
 982                 } else {
 983                     if (signalOnEmpty && a[m & (s - 1)] == null)
 984                         pool.signalWork(); // signal if was empty
 985                 }
 986             }
 987         }
 988 
 989         final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
 990             push(task, pool, true);
 991         }
 992 
 993         /**
 994          * Pushes task to a shared queue with lock already held, and unlocks.
 995          *
 996          * @return true if caller should signal work
 997          */
 998         final boolean lockedPush(ForkJoinTask<?> task) {
 999             ForkJoinTask<?>[] a = array;
1000             int s = top++, d = s - base, cap, m;
1001             if (a != null && (cap = a.length) > 0) {
1002                 a[(m = cap - 1) & s] = task;
1003                 if (d == m)
1004                     growArray();
1005                 source = 0; // unlock
1006                 if (d == m || a[m & (s - 1)] == null)
1007                     return true;
1008             }
1009             return false;
1010         }
1011 
1012         /**

1412     private static final int  TC_SHIFT   = 32;
1413     private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
1414     private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1415     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1416 
1417     // Instance fields
1418 
1419     final long keepAlive;                // milliseconds before dropping if idle
1420     volatile long stealCount;            // collects worker nsteals
1421     int scanRover;                       // advances across pollScan calls
1422     volatile int threadIds;              // for worker thread names
1423     final int bounds;                    // min, max threads packed as shorts
1424     volatile int mode;                   // parallelism, runstate, queue mode
1425     WorkQueue[] queues;                  // main registry
1426     final ReentrantLock registrationLock;
1427     Condition termination;               // lazily constructed
1428     final String workerNamePrefix;       // null for common pool
1429     final ForkJoinWorkerThreadFactory factory;
1430     final UncaughtExceptionHandler ueh;  // per-worker UEH
1431     final Predicate<? super ForkJoinPool> saturate;
1432     final SharedThreadContainer container;
1433 
1434     @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1435     volatile long ctl;                   // main pool control
1436 
1437     // Support for atomic operations
1438     private static final VarHandle CTL;
1439     private static final VarHandle MODE;
1440     private static final VarHandle THREADIDS;
1441     private static final VarHandle POOLIDS;
1442     private boolean compareAndSetCtl(long c, long v) {
1443         return CTL.compareAndSet(this, c, v);
1444     }
1445     private long compareAndExchangeCtl(long c, long v) {
1446         return (long)CTL.compareAndExchange(this, c, v);
1447     }
1448     private long getAndAddCtl(long v) {
1449         return (long)CTL.getAndAdd(this, v);
1450     }
1451     private int getAndBitwiseOrMode(int v) {
1452         return (int)MODE.getAndBitwiseOr(this, v);

1456     }
1457     private static int getAndAddPoolIds(int x) {
1458         return (int)POOLIDS.getAndAdd(x);
1459     }
1460 
1461     // Creating, registering and deregistering workers
1462 
1463     /**
1464      * Tries to construct and start one worker. Assumes that total
1465      * count has already been incremented as a reservation.  Invokes
1466      * deregisterWorker on any failure.
1467      *
1468      * @return true if successful
1469      */
1470     private boolean createWorker() {
1471         ForkJoinWorkerThreadFactory fac = factory;
1472         Throwable ex = null;
1473         ForkJoinWorkerThread wt = null;
1474         try {
1475             if (fac != null && (wt = fac.newThread(this)) != null) {
1476                 container.start(wt);
1477                 return true;
1478             }
1479         } catch (Throwable rex) {
1480             ex = rex;
1481         }
1482         deregisterWorker(wt, ex);
1483         return false;
1484     }
1485 
1486     /**
1487      * Provides a name for ForkJoinWorkerThread constructor.
1488      */
1489     final String nextWorkerThreadName() {
1490         String prefix = workerNamePrefix;
1491         int tid = getAndAddThreadIds(1) + 1;
1492         if (prefix == null) // commonPool has no prefix
1493             prefix = "ForkJoinPool.commonPool-worker-";
1494         return prefix.concat(Integer.toString(tid));
1495     }
1496 

2191         else if (q.lockedPush(task))
2192             signalWork();
2193     }
2194 
2195     /**
2196      * Pushes a possibly-external submission.
2197      */
2198     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
2199         Thread t; ForkJoinWorkerThread wt; WorkQueue q;
2200         if (task == null)
2201             throw new NullPointerException();
2202         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2203             (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null &&
2204             wt.pool == this)
2205             q.push(task, this);
2206         else
2207             externalPush(task);
2208         return task;
2209     }
2210 
2211     /**
2212      * Pushes an external submission to the current carrier thread's work queue if
2213      * possible. This method is invoked (reflectively) by the virtual thread
2214      * implementation.
2215      *
2216      * @param signalOnEmpty true to signal a worker when the queue is empty
2217      */
2218     private void externalExecuteTask(Runnable task, boolean signalOnEmpty) {
2219         var forkJoinTask = new ForkJoinTask.RunnableExecuteAction(task);
2220         Thread t = VirtualThreads.currentCarrierThread();
2221         if (t instanceof ForkJoinWorkerThread wt && wt.pool == this) {
2222             wt.workQueue.push(forkJoinTask, this, signalOnEmpty);
2223         } else {
2224             externalPush(forkJoinTask);
2225         }
2226     }
2227 
2228     /**
2229      * Returns common pool queue for an external thread that has
2230      * possibly ever submitted a common pool task (nonzero probe), or
2231      * null if none.
2232      */
2233     static WorkQueue commonQueue() {
2234         ForkJoinPool p; WorkQueue[] qs;
2235         int r = ThreadLocalRandom.getProbe(), n;
2236         return ((p = common) != null && (qs = p.queues) != null &&
2237                 (n = qs.length) > 0 && r != 0) ?
2238             qs[(n - 1) & (r << 1)] : null;
2239     }
2240 
2241     /**
2242      * Returns queue for an external thread, if one exists
2243      */
2244     final WorkQueue externalQueue() {
2245         WorkQueue[] qs;
2246         int r = ThreadLocalRandom.getProbe(), n;
2247         return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?

2357                 for (int j = 1; j < n; j += 2) { // unblock other workers
2358                     if ((q = qs[j]) != null && (thread = q.owner) != null &&
2359                         !thread.isInterrupted()) {
2360                         changed = true;
2361                         try {
2362                             thread.interrupt();
2363                         } catch (Throwable ignore) {
2364                         }
2365                     }
2366                 }
2367             }
2368             ReentrantLock lock; Condition cond; // signal when no workers
2369             if (((md = mode) & TERMINATED) == 0 &&
2370                 (md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2371                 (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2372                 (lock = registrationLock) != null) {
2373                 lock.lock();
2374                 if ((cond = termination) != null)
2375                     cond.signalAll();
2376                 lock.unlock();
2377                 container.close();
2378             }
2379             if (changed)
2380                 rescan = true;
2381             else if (rescan)
2382                 rescan = false;
2383             else
2384                 break;
2385         }
2386         return true;
2387     }
2388 
2389     // Exported methods
2390 
2391     // Constructors
2392 
2393     /**
2394      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2395      * java.lang.Runtime#availableProcessors}, using defaults for all
2396      * other parameters (see {@link #ForkJoinPool(int,
2397      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,

2556         if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
2557             throw new IllegalArgumentException();
2558         if (factory == null || unit == null)
2559             throw new NullPointerException();
2560         this.factory = factory;
2561         this.ueh = handler;
2562         this.saturate = saturate;
2563         this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2564         int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2565         int corep = Math.min(Math.max(corePoolSize, p), MAX_CAP);
2566         int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - p;
2567         int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2568         this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH);
2569         this.mode = p | (asyncMode ? FIFO : 0);
2570         this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
2571                     (((long)(-p)     << RC_SHIFT) & RC_MASK));
2572         this.registrationLock = new ReentrantLock();
2573         this.queues = new WorkQueue[size];
2574         String pid = Integer.toString(getAndAddPoolIds(1) + 1);
2575         this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";
2576 
2577         String name = "ForkJoinPool-" + pid;
2578         this.container = SharedThreadContainer.create(name);
2579     }
2580 
2581     // helper method for commonPool constructor
2582     private static Object newInstanceFromSystemProperty(String property)
2583         throws ReflectiveOperationException {
2584         String className = System.getProperty(property);
2585         return (className == null)
2586             ? null
2587             : ClassLoader.getSystemClassLoader().loadClass(className)
2588             .getConstructor().newInstance();
2589     }
2590 
2591     /**
2592      * Constructor for common pool using parameters possibly
2593      * overridden by system properties
2594      */
2595     private ForkJoinPool(byte forCommonPoolOnly) {
2596         int parallelism = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
2597         ForkJoinWorkerThreadFactory fac = null;
2598         UncaughtExceptionHandler handler = null;

2610         this.ueh = handler;
2611         this.keepAlive = DEFAULT_KEEPALIVE;
2612         this.saturate = null;
2613         this.workerNamePrefix = null;
2614         int p = Math.min(Math.max(parallelism, 0), MAX_CAP), size;
2615         this.mode = p;
2616         if (p > 0) {
2617             size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
2618             this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2619             this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
2620                         (((long)(-p) << RC_SHIFT) & RC_MASK));
2621         } else {  // zero min, max, spare counts, 1 slot
2622             size = 1;
2623             this.bounds = 0;
2624             this.ctl = 0L;
2625         }
2626         this.factory = (fac != null) ? fac :
2627             new DefaultCommonPoolForkJoinWorkerThreadFactory();
2628         this.queues = new WorkQueue[size];
2629         this.registrationLock = new ReentrantLock();
2630 
2631         String name = "ForkJoinPool.commonPool";
2632         this.container = SharedThreadContainer.create(name);
2633     }
2634 
2635     /**
2636      * Returns the common pool instance. This pool is statically
2637      * constructed; its run state is unaffected by attempts to {@link
2638      * #shutdown} or {@link #shutdownNow}. However this pool and any
2639      * ongoing processing are automatically terminated upon program
2640      * {@link System#exit}.  Any program that relies on asynchronous
2641      * task processing to complete before program termination should
2642      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2643      * before exit.
2644      *
2645      * @return the common pool instance
2646      * @since 1.8
2647      */
2648     public static ForkJoinPool commonPool() {
2649         // assert common != null : "static init error";
2650         return common;
2651     }
2652 
< prev index next >