ThreadPoolExecutor源码解读(一)重新认识ThreadPoolExecutor(核心参数、生命周期、位运算、ThreadFactory、拒接策略)
一、前言
在Java中,创建一个线程,就像创建一个对象一样简单,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,并且要为线程分配一系列的资源,所以线程是一个重量级的对象,应该避免频繁创建和销毁。线程池的出现就是为了管理线程,让线程复用。
阿里巴巴java开发手册中有一条强制的编程规约:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
既然推荐通过的方式使用线程池,则有必要了解一些底层原理,如:线程池的几个核心参数,提交任务的过程,任务是如何在线程池中运行,线程是如何复用,线程池的状态流转等。了解了这些原理知识,不仅可以轻松应对面试,更重要的是在实际工作中更好、更正确的使用线程池;同时如果出了bug,懂些原理也能快速排查问题所在。
二、提交任务涉及到的核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
从ThreadPoolExecutor构造函数来看,不得不说的几个参数,核心线程数、最大线程数、工作队列、线程工厂以及拒绝策略,他们之间有着千丝万缕的关系:
白纸黑字总是苍白的,如下是提交任务至线程池的流程图:

除了具有主角光环的参数外,还有几个参数决定着工作线程的生死存亡。决定非核心线程数的线程的存活时长。当线程池中创建的线程数量超过设置的 ,在某些线程处理完任务后,如果等待 时间,仍然没有新的任务分配给它们,那么这些线程就属于空闲线程,将会被回收。线程池回收线程,没有所谓的“核心线程”和“非核心线程”之分,直到线程池的线程数等于最小核心线程数,回收才会停止。
看样子线程池一定会有小于等于数量的线程一直存活,这样如果这个线程池是非核心线程池,一直占用着线程势必会影响到核心线程池的运行,所以核心线程数内的线程也有被回收的需求。
在创建线程池时,构造函数中并没有显式设置核心线程数内的线程过期回收的参数,但是可以通过调用方法将属性设置为true,从而使得核心线程数内的线程空闲等待时间后,依然没有任务分配时被回收。
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
三、线程池的生命周期
世间万物都有生死轮回,线程池也不例外,它也有自己的生命周期。而巧妙的是,作者Doug Lea用一个32位的int变量表示两种含义:高3位表示线程池的运行状态,低29位表示工作线程数。
//一个ctl 表示两种含义,高3位为runState,低29为workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//536870911
//0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//-536870912
//111 0 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
//0
//000 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//536870912
//001 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
//1073741824
//010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
//1610612736
//011 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//从变量ctl中解析出runState
//先将CAPACITY做按位非操作,即~n = - ( n+1 ),就是 RUNNING
//然后再做按位与,可得出高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//从变量ctl中解析出workerCount
//对CAPACITY按位与,可得出低29位
//1 & 1 = 1,1 & 0 = 0,0 & 1 = 0,0 & 0 = 0
private static int workerCountOf(int c) { return c & CAPACITY; }
//将rs和wc转为二进制 再进行按位或计算,位上只要有1就是该位就是1
// 1 & 1 = 1,1 & 0 = 1,0 & 1 = 1,0 & 0 = 0
private static int ctlOf(int rs, int wc) { return rs | wc; }
作者是如何将一个32位int变量表示为两个含义的呢?这就涉及到二进制数的位运算:
,1转为二进制数向左移29位,得到,这个过程相当于1*2^29,但是此时得到的是高3位的第一位(最小值),在其基础上减1就是低29位的最大值了,得到,将其设置为线程池的容量。
,创建线程池后,线程池就处于正在运行状态,其是-1向左移29位,-1的二进制是,向左移29位后剩下3个1,低29位补0,得到,正好1占满了高3位。(-1的二进制是1的补码,原码取反+1就是补码,如1的原码是,取反后是,再加1就是。)
,当调用,线程池进入状态,是0向左移29位依然是0。
,当调用,线程池进入状态,1左移29位得到。
,是一个过渡状态,当线程池调用或后,当线程池中没有正在运行的线程且工作队列为空,此时设置线程池状态为。2左移29位得到。
,才是代表线程池真正的寿终正寝。3左移29位得到。
还有三个方法才是将两个含义揉捻成一个变量,又分别拆出两个含义:
,运行状态和工作线程数,二者进行按位或计算合成一个变量。(操作,,二者比较,只要位上有1,该位就是1。)
,从中拆出运行状态。先对做按位非操作,即,就是 。然后按位与操作,可得高3位。(高3位都是1,低29位都是0,所以运算后,1只会出现在高3位,故而可得高3位)
,从中拆出工作线程数。对按位与,可得低29位。(因为低29位都是1,高3位都是0,所以运算后,1只会出现在低29位,故而可得低29位)
(操作,,二者比较,位上都是1,该位才是1。)
如图所示线程池的生命周期流转图:

四、ThreadFactory如何创建工作线程
创建线程池时可以不传,此时会给一个默认线程工厂,而它究竟是怎样生产工作线程的呢?
实现了接口,其主要做了3件事:
创建工作线程,并设置分组和命名。
工作线程是守护线程时,将线程设置为非守护线程。
设置工作线程默认优先级。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
//java.util.concurrent.Executors.DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
//创建线程时,设置分组和命名
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
用户可以自行实现接口,设计特殊的线程工厂。
五、四种官方拒接策略
当工作线程数大于等于时,此时再提交任务将会触发拒绝策略。
创建线程池时也可不传,此时会给一个默认的拒绝策略。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
总是会抛出一个异常,再无其他操作。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
另外官方还提供了3种拒绝策略:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
除了官方提供的四个拒绝策略外,用户还可以自行实现接口设计特殊的拒绝策略。
六、总结
【强制】线程池不允许使用 去创建,而是通过 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
线程数小于时,提交任务会继续创建线程,大于等于时,提交任务将加到工作队列中。
当工作队列已满,此时提交任务会创建线程,直到线程数达到,再提交任务触发拒绝策略。
默认情况下线程池中只会保留小于等于数量的线程,多余的空闲线程会等待时间后,依然没有任务分配,则被回收。
可以将设置为true,使得数量内的线程,空闲等待时间后,依然没有任务分配,则被回收。
高3位表示线程池运行状态,低29位表示工作线程数。
和都可自定义。
PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!