Bootstrap

ThreadPoolExecutor源码解读(一)重新认识ThreadPoolExecutor(核心参数、生命周期、位运算、ThreadFactory、拒接策略)

一、前言

在Java中,创建一个线程,就像创建一个对象一样简单,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,并且要为线程分配一系列的资源,所以线程是一个重量级的对象,应该避免频繁创建和销毁。线程池的出现就是为了管理线程,让线程复用。

阿里巴巴java开发手册中有一条强制的编程规约:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样

的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

既然推荐通过的方式使用线程池,则有必要了解一些底层原理,如:线程池的几个核心参数,提交任务的过程,任务是如何在线程池中运行,线程是如何复用,线程池的状态流转等。了解了这些原理知识,不仅可以轻松应对面试,更重要的是在实际工作中更好、更正确的使用线程池;同时如果出了bug,懂些原理也能快速排查问题所在。

二、提交任务涉及到的核心参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

从ThreadPoolExecutor构造函数来看,不得不说的几个参数,核心线程数、最大线程数、工作队列、线程工厂以及拒绝策略,他们之间有着千丝万缕的关系:

白纸黑字总是苍白的,如下是提交任务至线程池的流程图:

除了具有主角光环的参数外,还有几个参数决定着工作线程的生死存亡。决定非核心线程数的线程的存活时长。当线程池中创建的线程数量超过设置的 ,在某些线程处理完任务后,如果等待 时间,仍然没有新的任务分配给它们,那么这些线程就属于空闲线程,将会被回收。线程池回收线程,没有所谓的“核心线程”和“非核心线程”之分,直到线程池的线程数等于最小核心线程数,回收才会停止。

看样子线程池一定会有小于等于数量的线程一直存活,这样如果这个线程池是非核心线程池,一直占用着线程势必会影响到核心线程池的运行,所以核心线程数内的线程也有被回收的需求。

在创建线程池时,构造函数中并没有显式设置核心线程数内的线程过期回收的参数,但是可以通过调用方法将属性设置为true,从而使得核心线程数内的线程空闲等待时间后,依然没有任务分配时被回收。

public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            interruptIdleWorkers();
    }
}

三、线程池的生命周期

世间万物都有生死轮回,线程池也不例外,它也有自己的生命周期。而巧妙的是,作者Doug Lea用一个32位的int变量表示两种含义:高3位表示线程池的运行状态,低29位表示工作线程数。

//一个ctl 表示两种含义,高3位为runState,低29为workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//536870911
//0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//-536870912
//111 0 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING    = -1 << COUNT_BITS;
//0
//000 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//536870912
//001 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;
//1073741824
//010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;
//1610612736
//011 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
//从变量ctl中解析出runState
//先将CAPACITY做按位非操作,即~n = - ( n+1 ),就是 RUNNING
//然后再做按位与,可得出高3位
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//从变量ctl中解析出workerCount
//对CAPACITY按位与,可得出低29位
//1 & 1 = 1,1 & 0 = 0,0 & 1 = 0,0 & 0 = 0
private static int workerCountOf(int c)  { return c & CAPACITY; }
//将rs和wc转为二进制 再进行按位或计算,位上只要有1就是该位就是1
// 1 & 1 = 1,1 & 0 = 1,0 & 1 = 1,0 & 0 = 0
private static int ctlOf(int rs, int wc) { return rs | wc; }

作者是如何将一个32位int变量表示为两个含义的呢?这就涉及到二进制数的位运算:

  • ,1转为二进制数向左移29位,得到,这个过程相当于1*2^29,但是此时得到的是高3位的第一位(最小值),在其基础上减1就是低29位的最大值了,得到,将其设置为线程池的容量。

  • ,创建线程池后,线程池就处于正在运行状态,其是-1向左移29位,-1的二进制是,向左移29位后剩下3个1,低29位补0,得到,正好1占满了高3位。(-1的二进制是1的补码,原码取反+1就是补码,如1的原码是,取反后是,再加1就是。)

  • ,当调用,线程池进入状态,是0向左移29位依然是0。

  • ,当调用,线程池进入状态,1左移29位得到。

  • ,是一个过渡状态,当线程池调用或后,当线程池中没有正在运行的线程且工作队列为空,此时设置线程池状态为。2左移29位得到。

  • ,才是代表线程池真正的寿终正寝。3左移29位得到。

还有三个方法才是将两个含义揉捻成一个变量,又分别拆出两个含义:

  • ,运行状态和工作线程数,二者进行按位或计算合成一个变量。(操作,,二者比较,只要位上有1,该位就是1。)

  • ,从中拆出运行状态。先对做按位非操作,即,就是 。然后按位与操作,可得高3位。(高3位都是1,低29位都是0,所以运算后,1只会出现在高3位,故而可得高3位)

  • ,从中拆出工作线程数。对按位与,可得低29位。(因为低29位都是1,高3位都是0,所以运算后,1只会出现在低29位,故而可得低29位)

(操作,,二者比较,位上都是1,该位才是1。)

如图所示线程池的生命周期流转图:

四、ThreadFactory如何创建工作线程

创建线程池时可以不传,此时会给一个默认线程工厂,而它究竟是怎样生产工作线程的呢?

实现了接口,其主要做了3件事:

  • 创建工作线程,并设置分组和命名。

  • 工作线程是守护线程时,将线程设置为非守护线程。

  • 设置工作线程默认优先级。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}
//java.util.concurrent.Executors.DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        //创建线程时,设置分组和命名
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

用户可以自行实现接口,设计特殊的线程工厂。

五、四种官方拒接策略

当工作线程数大于等于时,此时再提交任务将会触发拒绝策略。

创建线程池时也可不传,此时会给一个默认的拒绝策略。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

总是会抛出一个异常,再无其他操作。

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

另外官方还提供了3种拒绝策略:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    
    public CallerRunsPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

除了官方提供的四个拒绝策略外,用户还可以自行实现接口设计特殊的拒绝策略。

六、总结

  • 【强制】线程池不允许使用 去创建,而是通过 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

  • 线程数小于时,提交任务会继续创建线程,大于等于时,提交任务将加到工作队列中。

  • 当工作队列已满,此时提交任务会创建线程,直到线程数达到,再提交任务触发拒绝策略。

  • 默认情况下线程池中只会保留小于等于数量的线程,多余的空闲线程会等待时间后,依然没有任务分配,则被回收。

  • 可以将设置为true,使得数量内的线程,空闲等待时间后,依然没有任务分配,则被回收。

  • 高3位表示线程池运行状态,低29位表示工作线程数。

  • 和都可自定义。

PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!