☕【Java技术之旅】走进线程池的世界(基础篇)
本章内容属于线程原理分析专题的基础篇,之后还有源码篇和深入篇。
🚀 前提概要
🚀 线程池定义
🚀 线程池优点
降低资源消耗 :通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。 提高响应速度 :任务到达时,无需等待线程创建即可立即执行。 提高线程的可管理性 :线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。 提供更多更强大的功能 :线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池 ScheduledThreadPoolExecutor ,就允许任务延期执行或定期执行。
🚀 线程池的运作流程
📚 创建线程任务

📚 线程池的构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
参数说明如下
corePoolSize:核心池大小 maximumPoolSize:线程池大小(maximumPoolSize >= corePoolSize) keepAliveTime:没有任务时线程的存活时间,默认情况下只有线程数目大于 corePoolSize 时,此参数才起作用,若线程数目等于 corePoolSize,则这些线程会一直存活。但若调用 allowCoreThreadTimeOut(boolean)方法,则线程数目不大于 corePoolSize 时,此参数也起作用,直到线程数目为 0 unit:keepAliveTime 的时间单位,有以下七种取值: TimeUnit.DAYS;//天 TimeUnit.HOURS;//小时 TimeUnit.MINUTES;//分钟 TimeUnit.SECONDS;//秒 TimeUnit.MILLISECONDS;//毫秒 TimeUnit.MICROSECONDS;//微秒 TimeUnit.NANOSECONDS;//纳秒 workQueue:指定构成缓冲区的阻塞队列,即指定了线程池的排队策略,常用的有以下三种: ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小 LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE synchronousQueue:这个队列不会保存提交的任务,而是将直接新建一个线程来执行新来的任务 threadFactory(可选):线程工厂,用来创建线程,可自定义 handler(可选):拒绝策略,有以下四种策略可选: ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。 ThreadPoolExecutor.DiscardPolicy:丢弃任务,不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务 ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
🚀 线程池的运作原理
📚 核心线程池
📚 创建过程
📚 线程初始化
prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
📚 任务提交
void execute(Runnable task),无返回值
Future submit(Runnable task, T result)
Future submit(Callable task),有返回值
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//获取ctl
//检查当前核心线程数,是否小于核心线程数的大小限制
if (workerCountOf(c) < corePoolSize) {
//没有达到核心线程数的大小限制,那么添家核心线程执行该任务
if (addWorker(command, true))
return;
//如果添加失败,刷新ctl值
c = ctl.get();
}
//再次检查线程池的运行状态,将任务添加到等待队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();//刷新ctl值
//如果当前线程池的装不是运行状态,那么移除刚才添加的任务
if (! isRunning(recheck) && remove(command))
reject(command);//移除成功后,使用拒绝策略处理该任务;
else if (workerCountOf(recheck) == 0)
//当前工作线程数为0
//线程池正在运行,或者移除任务失败。
//添加一个非核心线程,并不指定该线程的运行任务。
//等线程创建完成之后,会从等待队列中获取任务执行。
addWorker(null, false);
}
//逻辑到这里说明线程池已经不是RUNNING状态,或者等待队列已满,需要创建一个新的非核心线程执行该任务;
//如果创建失败,那么非核心线程已满,使用拒绝策略处理该任务;
else if (!addWorker(command, false))
reject(command);
}
🚀 阻塞队列
🚀 GC回收问题
🚀 业务峰值考虑
并发量的计算
运行模型
public void execute(Runnable command) {
...
if (isRunning(c) && workQueue.offer(command)) {
isRunning() 是判断线程池处理戚状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
...
}
对应的消费者源码如下:
private Runnable getTask() {
for (;;) {
...
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
...
}
}
🚀 可选线程池模型
📚 newCachedThreadPool(会创建过多的线程任务)
new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,
TimeUnit.SECONDS,new SynchronousQueue())
📚 newFixedThreadPool(会挤压过多的任务到队列里)
new ThreadPoolExecutor(nThreads,nThreads,0L,
TimeUnit.MILLISECONDS,new LinkedBlockingQueue())
📚 newSingleThreadPool(会挤压过多的任务到队列里)
new ThreadPoolExecutor(1,1,0L,
TimeUnit.MILLISECONDS,new LinkedBlockingQueue())
📚 newScheduledThreadPool(会创建过多的线程任务)
new ScheduledThreadPoolExecutor(nThreads,Integer.MAX_VALUE,0L,
TimeUnit.NANOSECONDS,new DelayedWorkQueue())
调用实例: 构造线程池时优先选用线程池模型,如果这些模型不能满足要求,再自定义 ThreadPoolExecutor 线程池
🚀 线程池的关闭
shutdown():不会立即关闭线程池,而是不再接受新的任务,等当前所有任务处理完之后关闭线程池
shutdownNow():立即关闭线程池,打断正在执行的任务,清空缓冲队列,返回尚未执行的任务