< prev index next >

src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java

Print this page

  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.ArrayList;
  39 import java.util.ConcurrentModificationException;
  40 import java.util.HashSet;
  41 import java.util.Iterator;
  42 import java.util.List;
  43 import java.util.concurrent.atomic.AtomicInteger;
  44 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  45 import java.util.concurrent.locks.Condition;
  46 import java.util.concurrent.locks.ReentrantLock;

  47 
  48 /**
  49  * An {@link ExecutorService} that executes each submitted task using
  50  * one of possibly several pooled threads, normally configured
  51  * using {@link Executors} factory methods.
  52  *
  53  * <p>Thread pools address two different problems: they usually
  54  * provide improved performance when executing large numbers of
  55  * asynchronous tasks, due to reduced per-task invocation overhead,
  56  * and they provide a means of bounding and managing the resources,
  57  * including threads, consumed when executing a collection of tasks.
  58  * Each {@code ThreadPoolExecutor} also maintains some basic
  59  * statistics, such as the number of completed tasks.
  60  *
  61  * <p>To be useful across a wide range of contexts, this class
  62  * provides many adjustable parameters and extensibility
  63  * hooks. However, programmers are urged to use the more convenient
  64  * {@link Executors} factory methods {@link
  65  * Executors#newCachedThreadPool} (unbounded thread pool, with
  66  * automatic thread reclamation), {@link Executors#newFixedThreadPool}

 460      * Otherwise exiting threads would concurrently interrupt those
 461      * that have not yet interrupted. It also simplifies some of the
 462      * associated statistics bookkeeping of largestPoolSize etc. We
 463      * also hold mainLock on shutdown and shutdownNow, for the sake of
 464      * ensuring workers set is stable while separately checking
 465      * permission to interrupt and actually interrupting.
 466      */
 467     private final ReentrantLock mainLock = new ReentrantLock();
 468 
 469     /**
 470      * Set containing all worker threads in pool. Accessed only when
 471      * holding mainLock.
 472      */
 473     private final HashSet<Worker> workers = new HashSet<>();
 474 
 475     /**
 476      * Wait condition to support awaitTermination.
 477      */
 478     private final Condition termination = mainLock.newCondition();
 479 





 480     /**
 481      * Tracks largest attained pool size. Accessed only under
 482      * mainLock.
 483      */
 484     private int largestPoolSize;
 485 
 486     /**
 487      * Counter for completed tasks. Updated only on termination of
 488      * worker threads. Accessed only under mainLock.
 489      */
 490     private long completedTaskCount;
 491 
 492     /*
 493      * All user control parameters are declared as volatiles so that
 494      * ongoing actions are based on freshest values, but without need
 495      * for locking, since no internal invariants depend on them
 496      * changing synchronously with respect to other actions.
 497      */
 498 
 499     /**

 709         for (;;) {
 710             int c = ctl.get();
 711             if (isRunning(c) ||
 712                 runStateAtLeast(c, TIDYING) ||
 713                 (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
 714                 return;
 715             if (workerCountOf(c) != 0) { // Eligible to terminate
 716                 interruptIdleWorkers(ONLY_ONE);
 717                 return;
 718             }
 719 
 720             final ReentrantLock mainLock = this.mainLock;
 721             mainLock.lock();
 722             try {
 723                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
 724                     try {
 725                         terminated();
 726                     } finally {
 727                         ctl.set(ctlOf(TERMINATED, 0));
 728                         termination.signalAll();

 729                     }
 730                     return;
 731                 }
 732             } finally {
 733                 mainLock.unlock();
 734             }
 735             // else retry on failed CAS
 736         }
 737     }
 738 
 739     /*
 740      * Methods for controlling interrupts to worker threads.
 741      */
 742 
 743     /**
 744      * If there is a security manager, makes sure caller has
 745      * permission to shut down threads in general (see shutdownPerm).
 746      * If this passes, additionally makes sure the caller is allowed
 747      * to interrupt each worker thread. This might not be true even if
 748      * first check passed, if the SecurityManager treats some threads

 925                 try {
 926                     // Recheck while holding lock.
 927                     // Back out on ThreadFactory failure or if
 928                     // shut down before lock acquired.
 929                     int c = ctl.get();
 930 
 931                     if (isRunning(c) ||
 932                         (runStateLessThan(c, STOP) && firstTask == null)) {
 933                         if (t.getState() != Thread.State.NEW)
 934                             throw new IllegalThreadStateException();
 935                         workers.add(w);
 936                         workerAdded = true;
 937                         int s = workers.size();
 938                         if (s > largestPoolSize)
 939                             largestPoolSize = s;
 940                     }
 941                 } finally {
 942                     mainLock.unlock();
 943                 }
 944                 if (workerAdded) {
 945                     t.start();
 946                     workerStarted = true;
 947                 }
 948             }
 949         } finally {
 950             if (! workerStarted)
 951                 addWorkerFailed(w);
 952         }
 953         return workerStarted;
 954     }
 955 
 956     /**
 957      * Rolls back the worker thread creation.
 958      * - removes worker from workers, if present
 959      * - decrements worker count
 960      * - rechecks for termination, in case the existence of this
 961      *   worker was holding up termination
 962      */
 963     private void addWorkerFailed(Worker w) {
 964         final ReentrantLock mainLock = this.mainLock;
 965         mainLock.lock();

1292     public ThreadPoolExecutor(int corePoolSize,
1293                               int maximumPoolSize,
1294                               long keepAliveTime,
1295                               TimeUnit unit,
1296                               BlockingQueue<Runnable> workQueue,
1297                               ThreadFactory threadFactory,
1298                               RejectedExecutionHandler handler) {
1299         if (corePoolSize < 0 ||
1300             maximumPoolSize <= 0 ||
1301             maximumPoolSize < corePoolSize ||
1302             keepAliveTime < 0)
1303             throw new IllegalArgumentException();
1304         if (workQueue == null || threadFactory == null || handler == null)
1305             throw new NullPointerException();
1306         this.corePoolSize = corePoolSize;
1307         this.maximumPoolSize = maximumPoolSize;
1308         this.workQueue = workQueue;
1309         this.keepAliveTime = unit.toNanos(keepAliveTime);
1310         this.threadFactory = threadFactory;
1311         this.handler = handler;



1312     }
1313 
1314     /**
1315      * Executes the given task sometime in the future.  The task
1316      * may execute in a new thread or in an existing pooled thread.
1317      *
1318      * If the task cannot be submitted for execution, either because this
1319      * executor has been shutdown or because its capacity has been reached,
1320      * the task is handled by the current {@link RejectedExecutionHandler}.
1321      *
1322      * @param command the task to execute
1323      * @throws RejectedExecutionException at discretion of
1324      *         {@code RejectedExecutionHandler}, if the task
1325      *         cannot be accepted for execution
1326      * @throws NullPointerException if {@code command} is null
1327      */
1328     public void execute(Runnable command) {
1329         if (command == null)
1330             throw new NullPointerException();
1331         /*

  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.ArrayList;
  39 import java.util.ConcurrentModificationException;
  40 import java.util.HashSet;
  41 import java.util.Iterator;
  42 import java.util.List;
  43 import java.util.concurrent.atomic.AtomicInteger;
  44 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  45 import java.util.concurrent.locks.Condition;
  46 import java.util.concurrent.locks.ReentrantLock;
  47 import jdk.internal.vm.SharedThreadContainer;
  48 
  49 /**
  50  * An {@link ExecutorService} that executes each submitted task using
  51  * one of possibly several pooled threads, normally configured
  52  * using {@link Executors} factory methods.
  53  *
  54  * <p>Thread pools address two different problems: they usually
  55  * provide improved performance when executing large numbers of
  56  * asynchronous tasks, due to reduced per-task invocation overhead,
  57  * and they provide a means of bounding and managing the resources,
  58  * including threads, consumed when executing a collection of tasks.
  59  * Each {@code ThreadPoolExecutor} also maintains some basic
  60  * statistics, such as the number of completed tasks.
  61  *
  62  * <p>To be useful across a wide range of contexts, this class
  63  * provides many adjustable parameters and extensibility
  64  * hooks. However, programmers are urged to use the more convenient
  65  * {@link Executors} factory methods {@link
  66  * Executors#newCachedThreadPool} (unbounded thread pool, with
  67  * automatic thread reclamation), {@link Executors#newFixedThreadPool}

 461      * Otherwise exiting threads would concurrently interrupt those
 462      * that have not yet interrupted. It also simplifies some of the
 463      * associated statistics bookkeeping of largestPoolSize etc. We
 464      * also hold mainLock on shutdown and shutdownNow, for the sake of
 465      * ensuring workers set is stable while separately checking
 466      * permission to interrupt and actually interrupting.
 467      */
 468     private final ReentrantLock mainLock = new ReentrantLock();
 469 
 470     /**
 471      * Set containing all worker threads in pool. Accessed only when
 472      * holding mainLock.
 473      */
 474     private final HashSet<Worker> workers = new HashSet<>();
 475 
 476     /**
 477      * Wait condition to support awaitTermination.
 478      */
 479     private final Condition termination = mainLock.newCondition();
 480 
 481     /**
 482      * The thread container for the worker threads.
 483      */
 484     private final SharedThreadContainer container;
 485 
 486     /**
 487      * Tracks largest attained pool size. Accessed only under
 488      * mainLock.
 489      */
 490     private int largestPoolSize;
 491 
 492     /**
 493      * Counter for completed tasks. Updated only on termination of
 494      * worker threads. Accessed only under mainLock.
 495      */
 496     private long completedTaskCount;
 497 
 498     /*
 499      * All user control parameters are declared as volatiles so that
 500      * ongoing actions are based on freshest values, but without need
 501      * for locking, since no internal invariants depend on them
 502      * changing synchronously with respect to other actions.
 503      */
 504 
 505     /**

 715         for (;;) {
 716             int c = ctl.get();
 717             if (isRunning(c) ||
 718                 runStateAtLeast(c, TIDYING) ||
 719                 (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
 720                 return;
 721             if (workerCountOf(c) != 0) { // Eligible to terminate
 722                 interruptIdleWorkers(ONLY_ONE);
 723                 return;
 724             }
 725 
 726             final ReentrantLock mainLock = this.mainLock;
 727             mainLock.lock();
 728             try {
 729                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
 730                     try {
 731                         terminated();
 732                     } finally {
 733                         ctl.set(ctlOf(TERMINATED, 0));
 734                         termination.signalAll();
 735                         container.close();
 736                     }
 737                     return;
 738                 }
 739             } finally {
 740                 mainLock.unlock();
 741             }
 742             // else retry on failed CAS
 743         }
 744     }
 745 
 746     /*
 747      * Methods for controlling interrupts to worker threads.
 748      */
 749 
 750     /**
 751      * If there is a security manager, makes sure caller has
 752      * permission to shut down threads in general (see shutdownPerm).
 753      * If this passes, additionally makes sure the caller is allowed
 754      * to interrupt each worker thread. This might not be true even if
 755      * first check passed, if the SecurityManager treats some threads

 932                 try {
 933                     // Recheck while holding lock.
 934                     // Back out on ThreadFactory failure or if
 935                     // shut down before lock acquired.
 936                     int c = ctl.get();
 937 
 938                     if (isRunning(c) ||
 939                         (runStateLessThan(c, STOP) && firstTask == null)) {
 940                         if (t.getState() != Thread.State.NEW)
 941                             throw new IllegalThreadStateException();
 942                         workers.add(w);
 943                         workerAdded = true;
 944                         int s = workers.size();
 945                         if (s > largestPoolSize)
 946                             largestPoolSize = s;
 947                     }
 948                 } finally {
 949                     mainLock.unlock();
 950                 }
 951                 if (workerAdded) {
 952                     container.start(t);
 953                     workerStarted = true;
 954                 }
 955             }
 956         } finally {
 957             if (! workerStarted)
 958                 addWorkerFailed(w);
 959         }
 960         return workerStarted;
 961     }
 962 
 963     /**
 964      * Rolls back the worker thread creation.
 965      * - removes worker from workers, if present
 966      * - decrements worker count
 967      * - rechecks for termination, in case the existence of this
 968      *   worker was holding up termination
 969      */
 970     private void addWorkerFailed(Worker w) {
 971         final ReentrantLock mainLock = this.mainLock;
 972         mainLock.lock();

1299     public ThreadPoolExecutor(int corePoolSize,
1300                               int maximumPoolSize,
1301                               long keepAliveTime,
1302                               TimeUnit unit,
1303                               BlockingQueue<Runnable> workQueue,
1304                               ThreadFactory threadFactory,
1305                               RejectedExecutionHandler handler) {
1306         if (corePoolSize < 0 ||
1307             maximumPoolSize <= 0 ||
1308             maximumPoolSize < corePoolSize ||
1309             keepAliveTime < 0)
1310             throw new IllegalArgumentException();
1311         if (workQueue == null || threadFactory == null || handler == null)
1312             throw new NullPointerException();
1313         this.corePoolSize = corePoolSize;
1314         this.maximumPoolSize = maximumPoolSize;
1315         this.workQueue = workQueue;
1316         this.keepAliveTime = unit.toNanos(keepAliveTime);
1317         this.threadFactory = threadFactory;
1318         this.handler = handler;
1319 
1320         String name = getClass().getName() + "@" + System.identityHashCode(this);
1321         this.container = SharedThreadContainer.create(name);
1322     }
1323 
1324     /**
1325      * Executes the given task sometime in the future.  The task
1326      * may execute in a new thread or in an existing pooled thread.
1327      *
1328      * If the task cannot be submitted for execution, either because this
1329      * executor has been shutdown or because its capacity has been reached,
1330      * the task is handled by the current {@link RejectedExecutionHandler}.
1331      *
1332      * @param command the task to execute
1333      * @throws RejectedExecutionException at discretion of
1334      *         {@code RejectedExecutionHandler}, if the task
1335      *         cannot be accepted for execution
1336      * @throws NullPointerException if {@code command} is null
1337      */
1338     public void execute(Runnable command) {
1339         if (command == null)
1340             throw new NullPointerException();
1341         /*
< prev index next >