原创

一文读懂Executors.newFixedThreadPool(int)

这段时间没有在博客更新文章,陆续再头条上更新了几篇,没有做同步,惰性真的很可怕,已经一个月没没写文章了,也有一部分是工作原因把。除此之外好多文章我都写在了本地,没有往博客上发,还是那句话,要时刻监督自己,博客坚持写下去。

我觉的今天的内容对于初学多线程的人也是干货,这些原理要是懂了,别人做多线程时你也能看懂为什么这样做,甚至可以优化他写的代码,废话不多说了


主角登场

package java.util.concurrent;
//这里省略了导包
public class Executors {

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
         /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue, using the provided
     * ThreadFactory to create new threads when needed.  At any point,
     * at most {@code nThreads} threads will be active processing
     * tasks.  If additional tasks are submitted when all threads are
     * active, they will wait in the queue until a thread is
     * available.  If any thread terminates due to a failure during
     * execution prior to shutdown, a new one will take its place if
     * needed to execute subsequent tasks.  The threads in the pool will
     * exist until it is explicitly {@link ExecutorService#shutdown
     * shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
        //这里删除N多个方法

    /** Cannot instantiate. 看到下面的构造方法,心里有没有感想呢?*/
    private Executors() {}
}

上面这两个方法是创建固定数量的线程池的两种方法,两者的区别是:第二种创建方法多了一个线程工厂的方法。我们继续看ThreadPoolExecutor这个类中的构造函数:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                        threadFactory, defaultHandler);
}
//ThreadPollExecutor中的所有的构造函数最终都会调用下面这个构造函数,接下来分析这个构造做了什么吧
   /**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 线程池启动后,在池中保持的线程的最小数量。需要说明的是线程数量是逐步到达corePoolSize值的。
 例如corePoolSize被设置为10,而任务数量只有5,则线程池中最多会启动5个线程,而不是一次性地启动10个线程,
 此外,如果任务来了,线程数少于10,则会继续创建新线程去处理任务,即便线程池中有空闲线程也不会使用。

 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 线程池中能容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler拒绝策略处理。

 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 线程的最大生命周期。这里的生命周期有两个约束条件:
     一:该参数针对的是超过corePoolSize数量的线程;
     二:处于非运行状态的线程。举个例子:如果corePoolSize(最小线程数)为10,maxinumPoolSize(最大线程数)为20,而此时线程池中有15个线程在运行,过了一段时间后,其中有3个线程处于等待状态的时间超过keepAliveTime指定的时间,则结束这3个线程,此时线程池中则还有12个线程正在运行

 * @param unit the time unit for the {@code keepAliveTime} argument
 这是keepAliveTime的时间单位,可以是纳秒,毫秒,秒,分钟等。

 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。这个任务队列是一个阻塞式的单端队列。
     ArrayBlockingQueue:有界队列(基于数组的)
    LinkedBlockingQueue:有/无界队列(基于链表的,传参就是有界,不传就是无界)
    SynchronousQueue:同步移交队列(需要一个线程调用put方法插入值,另一个线程调用take方法删除值)

 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 定义如何启动一个线程,可以设置线程的名称,并且可以确定是否是后台线程等。

 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 拒绝任务处理器。用于处理超出线程数量和队列容量而对继续增加的任务进行处理的程序。

 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
    public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

有界队列

如果你把队列做成有界队列,队列满了此时会怎么做呢?这个时候假设你的MaxnumPoolSize是比corePoolSize大的,此时会继续创建额外的线程进入线程池里,来处理任务,然后超过corePoolSize数量的线程如果处理完了一个任务,也会尝试从队列中去获取任务来执行。
如果队列还是满的,此时新来的任务会怎么办?只能被reject掉了,它有几种reject策略,可以传入RejectedExecutionHander
(1)AbortPolicy
(2)DiscardPolicy
(3)DiscardOldestPolicy
(4)CallerRunsPolicy
(5)自定义
如果任务满满没了,线程空闲了超过corePoolSize的线程就会被自动释放掉,再keepAliveTime之后释放。

无界队列

无界队列就是这个队列无限大,可以不断把任务放进去,但是这样也是有问题,如果任务不能被处理完,再无限增加,会导致服务器内存资源耗尽,系统也会崩溃。即使没有崩溃,也hi导致你的机器cpu load,负载特别高,反应慢。

我们该使用哪种队列

其实不管怎样都会出问题,这个要根据自己的系统选择一个更优的策略,一般无界的使用的比较多,只要不是任务被阻塞,任务能顺利的执行完成就不会导致服务内存出现问题。

远程服务器异常的情况下,使用无界队列的情况

如果远程服务器异常,会导致调用超时,使得队列变得越来越大,此时会导致内存飙升,而且可能会导致OOM.

不管怎样使用,我们都得小心使用多线程,使用的时候一定要规范,不然真的很容易出问题!

正文到此结束(点击广告是对作者最大的支持)