Bootstrap

【万字图文-原创】 | 学会Java中的线程池,这一篇也许就够了!

前言

Java中的线程池已经不是什么神秘的技术了,相信在看的读者在项目中也都有使用过。关于线程池的文章也是数不胜数,我们站在巨人的肩膀上来再次梳理一下。

本文还是保持原有的风格,图文解析,尽量做到多画图!全文共20000+字,建议收藏后细细品读,阅读期间搭配源码食用效果更佳!

读完此文你将学到:

本文源码基于JDK1.8

线程池基本概念

线程池是一种池化思想的产物,如同我们数据库有连接池、Java中的常量池。线程池可以帮助我们管理线程、复用线程,减少线程频繁新建、销毁等带来的开销。

在Java中是通过类来创建一个线程池的,一般我们建议项目中自己去定义线程池,不推荐使用提供的工具类去构建线程池。

查看阿里巴巴开发手册中也有对线程池的一些建议:

创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。自定义线程工厂,并且根据外部特征进行分组,比如,来自同一机房的调用,把机房编号赋值给

public class UserThreadFactory implements ThreadFactory {

    private final String namePrefix;
    private final AtomicInteger nextId = new AtomicInteger(1);

    UserThreadFactory(String whatFeaturOfGroup) {
        namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
    }

    @Override
    public Thread newThread(Runnable task) {
        String name = namePrefix + nextId.getAndIncrement();
        Thread thread = new Thread(null, task, name, 0, false);
        System.out.println(thread.getName());
        return thread;
    }
}

线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。2) CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

线程池使用示例

下面是一个自定义的线程池,这是之前公司在用的一个线程池,修改其中部分属性和备注做脱敏处理:

public class MyThreadPool {
    static final Logger LOGGER = LoggerFactory.getLogger(MyThreadPool.class);

    private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

    private static final String THREAD_POOL_NAME = "MyThreadPool-%d";

    private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
            .daemon(true).build();

    private static final int DEFAULT_SIZE = 500;

    private static final long DEFAULT_KEEP_ALIVE = 60L;

    private static ExecutorService executor;

    private static BlockingQueue executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);

    static {
        try {
            executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT + 2, DEFAULT_KEEP_ALIVE,
                    TimeUnit.SECONDS, executeQueue, FACTORY);

            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("MyThreadPool shutting down.");
                    executor.shutdown();

                    try {
                        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                            LOGGER.error("MyThreadPool shutdown immediately due to wait timeout.");
                            executor.shutdownNow();
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("MyThreadPool shutdown interrupted.");
                        executor.shutdownNow();
                    }

                    LOGGER.info("MyThreadPool shutdown complete.");
                }
            }));
        } catch (Exception e) {
            LOGGER.error("MyThreadPool init error.", e);
            throw new ExceptionInInitializerError(e);
        }
    }

    private MyThreadPool() {
    }

    public static boolean execute(Runnable task) {

        try {
            executor.execute(task);
        } catch (RejectedExecutionException e) {
            LOGGER.error("Task executing was rejected.", e);
            return false;
        }

        return true;
    }

    public static  Future submitTask(Callable task) {

        try {
            return executor.submit(task);
        } catch (RejectedExecutionException e) {
            LOGGER.error("Task executing was rejected.", e);
            throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
        }
    }
}

这里主要就是使用调用构造函数来构造一个线程池,指定自定义的,里面包含我们自己线程池的等信息。重写里面的和方法。 添加了系统关闭时的钩子函数,在里面调用线程池的方法,使得系统在退出(使用ctrl c或者kill -15 pid)时能够优雅的关闭线程池。

如果有看不懂的小伙伴也没有关系,后面会详细分析中的源码,相信看完后面的代码再回头来看这个用例 就完全是小菜一碟了。

线程池实现原理

通过上面的示例代码,我们需要知道创建线程池时几个重要的属性:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

: 线程池核心线程数量: 线程池最大线程数量: 线程池中阻塞队列,一般指定队列大小

线程池中数据模型可以简化成下图所示,其中应该是添加的一个个,这里标注的是为了方便理解:

线程池中提交一个任务具体执行流程如下图:

提交任务时,比较当前线程池中线程数量和核心线程数的大小,根据比较结果走不同的任务处理策略,这个下面会有详细说明。

线程池中核心方法调用链路:

TheadPoolExecutor源码初探

中常用属性和方法较多,我们可以先分析下这些,然后一步步往下深入,常用属性和方法如下:

具体代码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

     private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }
}

RUNNING:

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。(2) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0

SHUTDOWN:

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN

STOP:

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP

TIDYING:

(1) 状态说明:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING

TERMINATED:

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED

状态的变化流转:

execute()源码分析

当有任务提交到线程池时,就会直接调用方法,执行流程如下:

从流程图可看,添加任务会有三个分支判断,源码如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

在这里代表线程池的值,包含工作任务数量以及线程池的状态,上面有解释过。

接着看下面几个分支代码:

分支一: ,条件成立表示当前线程数量小于核心线程数,此次提交任务,直接创建一个新的。

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}

如果线程数小于核心线程数,执行操作,这个后面会讲这个方法的细节,如果添加成功则直接返回,失败后会重新计算的值,然后执行分支二。

针对执行失败的情况,有以下几种可能:

分支二:

通过分支一流程的分析,我们可以知道执行到这个分支说明**当前线程数量已经达到或者执行失败,我们先看看分支二执行流程:

首先判断当前线程池是否处于状态,如果是则尝试将放入到中,是我们在初始化时传入进来的阻塞队列。

如果当前任务成功添加到阻塞队列中,再次获取赋值给变量,然后执行:

if (!isRunning(recheck) && remove(command))
    reject(command);

再次判断当前线程池是否为状态,如果不是则说明提交任务到队列之后,线程池状态被其他线程给修改了,比如调用等。这种情况就需要把刚刚提交到队列中的的任务删除掉。

再看下remove()方法:

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate();
    return removed;
}

如果任务提交到队列之后,线程池中的线程还未将这个任务消费,那么就可以成功,调用方法来执行拒绝策略。如果在改变线程池状态之前,队列中的数据已经被消费了,此时就会失败。

接着走中的逻辑:

else if (workerCountOf(recheck) == 0)
    addWorker(null, false);

走这个逻辑有两种可能,线程池是状态或者线程池状态被改变且中添加的任务已经被消费导致失败。如果是状态,线程池中的线程数量是0,此时中还有待执行的任务,就需要新增一个(里面会有创建线程的操作),继续消费中的任务。

这里要注意一下,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到中了,所以在执行的时候,会直接从中获取任务。在时执行也是为了保证线程池在状态下必须要有一个线程来执行任务,可以理解为一种担保兜底机制。

至于线程池中线程为何可以为0?这个如果我们设置了,那么核心线程也是允许被回收的,后面中代码有提及。

分支三:

通过分支一和分之二的分析,进入这个分支的前置条件:线程数超过核心线程数且中数据已满。

,执行添加操作,如果执行失败就直接走拒绝策略。这里添加失败可能是线程数已经超过了。

addWorker()源码分析

上面分析提交任务的方法时多次用到方法,接收任务后将任务添加到中。

是中的内部类,继承自且实现了接口。 类中包含,它是内部封装的工作线程,还有属性,它是一个可执行的对象。在的构造函数中,使用线程工厂创建了一个线程,当启动的时候,会以为入口启动线程,这里会直接调用到中。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    private static final long serialVersionUID = 6138294804551838833L;

    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

流程如下图:

这里再回头看下addWorker(Runnable firstTask, boolean core) 方法,这个方法主要是添加一个到线程池中并执行,参数用于指定新增的线程执行的第一个任务,参数为true表示在新增线程时会判断当前活动线程数是否少于,表示在新增线程时会判断当前活动线程数是否少于

addWorker方法整体执行流程图如下:

接着看下源码:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这里是有两层循环,外层循环主要是判断线程池的状态,如果状态不合法就直接返回.

只有两种情况属于合法状态:

第二层循环是通过操作更新数量,如果更新成功则往线程池中中添加线程,这个所谓的线程池就是一个数组。添加失败时判断失败原因,失败有两种原因:线程池状态被改变或者并发情况修改线程池中数量,这两种情况都会导致值被修改。如果是第二种原因导致的失败,继续自旋更新数量。

接着继续分析循环内部的实现,先看看第一层循环:代表线程池值,代表线程池运行状态。

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
    return false;

条件一: 成立, 说明当前线程池状态不是状态

条件二:

我们之前提到过,创建任务有两种情况:1)状态可以提交任务,2)状态下如果传递的任务是空且阻塞队列中还有任务未处理的情况才是允许创建任务继续处理的,因为阻塞队列中的任务仍然需要继续处理。

上面的条件一和条件二就是处理状态下任务创建操作的判断。

接着分析第二层循环,先是判断线程池数量是否大于可创建的最大值,或者是否超过了核心线程数/最大线程数,如果是则直接返回,操作失败。

接着使用将线程池中,这里使用的是操作,如果成功则直接跳出最外层循环。

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();
    if (runStateOf(c) != rs)
        continue retry;
}

如果失败,说明此时有竞争,会重新获取的值,判断竞争失败的原因是添加数量还是修改线程池状态导致的,如果线程池状态未发生改变,就继续循环尝试增加数量,接着看循环结束后逻辑:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive())
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (!workerStarted)
        addWorkerFailed(w);
}

这里代表是否已经启动,代表创建的是否添加到池子中,这里所谓的池子就是全局定义的一个结构的变量。

接着根据传递的来构建一个,在的构造方法中也会通过创建一个线程,这里判断是因为用户可以自定义,如果这里用户不是创建线程而是直接返回则会出现一些问题,所以需要判断一下。

w = new Worker(firstTask);
final Thread t = w.thread;

if (t != null) {

}

在往池子中添加的时候,是需要先加锁的,因为针对全局的操作并不是线程安全的。

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

继续看下面代码,代表当前线程池的状态,这里还是判断线程池的状态,如果代表线程池状态是状态,此时可以直接操作。如果是状态,需要满足才可以继续操作。因为在状态时不会再添加新的任务,但还是可以继续处理中的任务。

当线程后,线程会返回,这里还是防止自定义的创建线程返回给外部之前,将线程了,由此可见考虑问题真的很全面。

int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
    (rs == SHUTDOWN && firstTask == null)) {
    if (t.isAlive())
        throw new IllegalThreadStateException();
    workers.add(w);
}

接着将创建的添加到集合中,设置,这个属性是线程池生命周期内线程数最大值,一般是做统计数据用的。 最后修改,代表当前提交的任务所创建的已经添加到池子中了。

添加成功后,调用线程的方法启动线程,因为中重写了方法,最后会执行。最后设置后释放全局锁。

int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {

    workers.add(w);
    int s = workers.size();
    if (s > largestPoolSize)
        largestPoolSize = s;

    orkerAdded = true;
}

这里再回头看看的情形,如果线程池在之前,状态发生了变化,导致添加失败。此时也会为,最后执行操作,这个方法是将从中移除掉,然后将数量减一,最后执行)来尝试关闭线程池,这个方法后面会细说。

runWorker()源码分析

在类中的方法调用了来执行任务。上面方法正常的执行逻辑会创建一个,然后启动中的线程,这里其实就会执行到方法。

的执行逻辑很简单,启动一个线程,执行当前传递的任务,执行完后又不断的从中获取任务继续执行,如果当前数量小于核心线程数且队列中没有了任务,当前线程会被阻塞,这个就是的逻辑,一会会讲到。

如果当前线程数大于核心线程数且队列中没有任务,就会返回,在这边退出循环,回收多余的数据。

源码如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

这里是为了初始化当前中,然后设置独占线程为,因为在方法中会尝试获取中的锁,如果获取成功代表当前线程没有被加锁处于空闲状态,给当前线程一个中断信号。所以这里在执行线程任务的时候需要加锁,防止调用的时候给当前线程一个中断信号。

判断是否为空,如果是一个空任务,那么就去中获取任务,如果两者都为空就会退出循环。

while (task != null || (task = getTask()) != null) {}

最核心的就是调用启动当前任务,这里面还有两个可扩展的方法,分别是beforeExecute()/afterExecute(),我们可以在任务执行前和执行后分别自定义一些操作,其中可以接收到任务抛出的异常信息,方便我们做后续处理。

while (task != null || (task = getTask()) != null) {
    try {
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
            task.run();
        } catch (RuntimeException x) {
            thrown = x; throw x;
        } catch (Error x) {
            thrown = x; throw x;
        } catch (Throwable x) {
            thrown = x; throw new Error(x);
        } finally {
            afterExecute(task, thrown);
        }
    } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
    }
}

如果退出循环,说明方法返回。会执行到中的方法,此方法是用来清理线程池中添加的数据,代表是异常情况下退出。

try {
    while (task != null || (task = getTask()) != null) {

    }
    completedAbruptly = false;
} finally {
    processWorkerExit(w, completedAbruptly);
}

中只是启动了当前线程工作,还需要源源不断通过方法从来获取任务执行。在没有任务的时候,根据线程池和核心线程数的对比结果来使用执行清理工作。

getTask()源码分析

方法用于从阻塞队列中获取任务,如果当前线程小于核心线程,那么当阻塞队列中没有任务时就会阻塞,反之会等待后返回。

这个就是的使用含义:非核心的空闲线程等待新任务的时间,当然如果这里设置了也会回收核心线程。

具体代码如下:

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

这里核心代码就是从中取任务,采用还是取决于和线程数量,在构造后设置的,默认为false。如果设置为则代表核心线程数下的线程也是可以被回收的。如果使用则表明中没有任务当前线程就会被阻塞挂起,直到有了新的任务才会被唤醒。

在这里扩展下阻塞队列的部分方法的含义,这里主要是看和的使用区别:阻塞队列插入方法:

boolean add(E e):队列没有满,则插入数据并返回true;队列满时,抛出异常 java.lang.IllegalStateException: Queue full。boolean offer(E e):队列没有满,则插入数据并返回true;队列满时,返回false。void put(E e):队列没有满,则插入数据;队列满时,阻塞调用此方法线程,直到队列有空闲空间时此线程进入就绪状态。boolean offer(E e, long timeout, TimeUnit unit):队列没有满,插入数据并返回true;队列满时,阻塞调用此方法线程,若指定等待的时间内还不能往队列中插入数据,返回false。

阻塞队列移除(获取)方法:

E remove():队列非空,则以FIFO原则移除数据,并返回该数据的值;队列为空,抛出异常 java.util.NoSuchElementException。

E poll(): 队列非空,移除数据,并返回该数据的值;队列为空,返回null。

E take(): 队列非空,移除数据,并返回该数据的值;队列为空,阻塞调用此方法线程,直到队列为非空时此线程进入就绪状态。

E poll(long timeout, TimeUnit unit):队列非空,移除数据,并返回该数据的值;队列为空,阻塞调用此方法线程,若指定等待的时间内队列都没有数据可取,返回null。

阻塞队列检查方法:

E element(): 队列非空,则返回队首元素;队列为空,抛出异常 java.util.NoSuchElementException。E peek(): 队列非空,则返回队首元素;队列为空,返回null。

processWorkerExit()源码分析

此方法的含义是清理当前线程,从线程池中移除掉刚刚添加的对象。

执行代表在线程跳出了当前循环,一般有两种情况:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly)、
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false);
    }
}

针对于线程池的操作都会进行加锁处理,然后将当前从池子中移除,累加当前线程池完成的任务总数。

接着调用尝试关闭线程池,这个方法后面有详细说明。

接着判断,含义是当前线程池状态小于,即当前线程池状态当前线程池状态为 或 ,判断当前线程是否是正常退出。如果当前线程是正常退出,那么,接着判断线程池中是否还拥有足够多的的线程,因为异常退出可能导致线程池中线程数量不足,此时就要执行为线程池添加新的数据,看下面的详细分析:

执行最后的addWorke()有三种可能:1)当前线程在执行时 发生异常,这里一定要创建一个新顶上去。2)如果说明任务队列中还有任务,这种情况下最起码要留一个线程,因为当前状态为 RUNNING || SHUTDOWN这是前提条件。3)当前线程数量 < corePoolSize值,此时会创建线程,维护线程池数量在个水平。

tryTerminate()源码分析

上面移除的方法中有一个方法的调用,这个方法是根据线程池状态尝试关闭线程池。

执行流程如下:

实现源码如下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

首先是判断线程池状态:条件一:isRunning(c) 成立,直接返回就行,线程池很正常!条件二:runStateAtLeast(c, TIDYING) 说明 已经有其它线程 在执行 TIDYING -> TERMINATED状态了,当前线程直接回去。条件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())SHUTDOWN特殊情况,如果是这种情况,直接回去。得等队列中的任务处理完毕后,再转化状态。

接着执行:

if (workerCountOf(c) != 0) {
    interruptIdleWorkers(ONLY_ONE);
    return;
}

走到这个逻辑,说明线程池状态 >= STOP或者线程池状态为且队列已经空了

当前线程池中的线程数量 > 0,调用中断一个空闲线程,然后返回。我们来分析下,在返回为空时会执行退出逻辑,这里就会调用方法尝试关闭线程池。

如果此时线程池状态满足线程池状态 >= STOP或者线程池状态为且队列已经空了,如果此时线程池中线程数不为0,就会中断一个空闲线程。为什么这里只中断一个线程呢?这里的设计思想是,如果线程数量特别多的话,只有一个线程去做唤醒空闲的任务可能会比较吃力,所以,就给了每个 被唤醒的线程 ,在真正退出之前协助 唤醒一个空闲线程的任务,提供吞吐量的一种常用手段。

我们顺便看下源码:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

遍历,如果线程是空闲状态(空闲状态:queue.take()和queue.poll()返回空),则给其一个中断信号,如果是处于阻塞的线程,会被唤醒,唤醒后,进入下一次自旋时,可能会执行退出相关的逻辑,接着又会调用,回到上面场景,当前线程退出的时候还是会继续唤醒下一个空现线程。

接着往下看的剩余逻辑:

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
        try {
            terminated();
        } finally {
            ctl.set(ctlOf(TERMINATED, 0));
            termination.signalAll();
        }
        return;
    }
} finally {
    mainLock.unlock();
}

执行到这里的线程是谁? 时,会来到这里。最后一个退出的线程。 在 (线程池状态 >= STOP || 线程池状态为 SHUTDOWN 且 队列已经空了)线程唤醒后,都会执行退出逻辑,退出过程中 会 先将 workerCount计数 -1 => ctl -1。调用 方法之前,已经减过了,所以0时,表示这是最后一个退出的线程了。

获取全局锁,进行加锁操作,通过设置线程池状态为状态,设置成功则执行方法,这也是一个自定义扩展的方法,当线程池中止的时候会调用此方法。

最后设置线程池状态为状态,唤醒调用方法的线程。

awaitTermination()源码分析

该方法是判断线程池状态是否达到,如果达到了则直接返回,没有达到则会挂起当前线程指定的时间。

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

在每次执行后会唤醒所有被的线程,继续判断线程池状态。

shutDown()/shutDownNow()源码分析

和方法都是直接改变线程池状态的方法,一般我们在系统关闭之前会调用此方法优雅的关闭线程池。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

public List shutdownNow() {
    List tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

和方法调用差不多,只是是将线程池状态设置为,是将线程池状态设置为。会返回所有未处理的集合。

来看看它们共同调用的一些方法:

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

这个方法是设置线程池状态为指定状态,,判断当前线程池值,如果小于则会往后执行。ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))),通过指令,修改中线程池状态为传入的。

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

含义是为空闲的线程设置中断标识,这里要清楚什么时候空闲?我们在上面讲解方法时,执行之前,要针对对象加锁,设置中的值为1,防止运行的被添加中断标识。接着执行方法,获取阻塞队列中的任务,如果是则会阻塞挂起当前线程,释放锁,此时线程处于空闲状态。如果是返回为空,会释放锁,此时线程也是空闲状态。

执行后处于阻塞的线程,会被唤醒,唤醒后,进入下一次自旋判断线程池状态是否改变,如果改变可能直接返回空,这里具体参看和方法。

也是一个扩展方法,需要子类去重写,这里代表当线程池关闭后需要做的事情。方法是获取中现有的的任务列表。

问题回顾

总结

这篇线程池源码覆盖到了中大部分代码,我相信认真阅读完后肯定会对线程池有更深刻的理解。如有疑问或者建议可关注公众号给我私信,我都会一一为大家解答。

另外推荐一个我的up主朋友,他自己录制了好多学习视频并分享在B站上了,大家有时间可以看一下(PS:非恰饭非利益相关,良心推荐):