Bootstrap

ThreadPoolExecutor源码解读(二)execute提交任务,Worker详解。如何执行任务?如何回收空闲线程?

一、前言

了解了线程池基本属性的概念是远远不够的,还需要知道每一个属性在源码中的体现,比如提交任务的过程中是如何将核心线程数、工作队列、最大线程数以及拒绝策略等连起来的?工作线程是如何执行任务代码的?线程池是如何回收空闲线程的?

默认情况下,刚初始化好的线程池是没有任何存活的线程的,等到有任务提交才创建线程执行任务。如果实际使用中希望线程池初始时尽快执行任务,可以调用或者方法,预先在线程池启动几个工作线程,等待任务提交并执行。

//循环启动corePoolSize个工作线程
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}
//在corePoolSize范围内启动一个工作线程
//个人认为无需在外面判断workerCountOf(ctl.get()) < corePoolSize
//因为addWorker里面会判断线程数是否满足corePoolSize
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}

二、execute()提交任务

线程池是一个生产者消费者模式,提交任务是生产者,工作线程从工作队列拉取任务执行是消费者。代码逻辑主要分为4步:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //1、worker数小于corePoolSize,直接创建worker线程
        if (addWorker(command, true))
            //创建成功,返回
            return;
        c = ctl.get();
    }
    //2、线程数大于等于corePoolSize,任务加入workQueue阻塞队列
    //2.1.判断线程池状态是否为running,是并加入阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        //2.2.再次检查线程池状态是否running
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            //2.2.1 线程池不在running态,将任务从阻塞队列删除,调用拒绝策略
            reject(command);
        //若线程数为0则启动一个空线程从workQueue中取任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //不是running或者任务加入阻塞队列失败(阻塞队列满了)
    //3、判断maximumPoolSize,阻塞队列已满且小于maximumPoolSize,创建新的worker
    else if (!addWorker(command, false))
        //4、大于等于maximumPoolSize,拒绝策略
        reject(command);
}

当线程数大于等于,任务被加入时,对线程池的状态进行了,这是有必要的:

当任务加入后判断此时工作线程数为0,则启动一个空工作线程消费中的任务。

三、addWorker() 创建并启动工作线程

的核心在,它是如何创建并启动一个工作线程的呢?第二个参数为true时线程数与比较,false时线程数与比较。

代码很长,有两个大的for循环,逻辑主要涉及2步:

  • 第一个for循环:自旋CAS操作加1。

  • 第二个for循环:创建并启动新Worker线程。

在加1和创建worker时都反复判断线程池当前的运行状态,是为了当线程池调用了或者时做出相应的措施。

private boolean addWorker(Runnable firstTask, boolean core) {
    //1、这个循环主要是为了自旋 cas  workerCount+1,成功之后就创建worder
    // 因为代码块没有加锁,且是有可能多个线程同时操作,所以采用了cas乐观锁的方式。
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //线程池状态 大于等于SHUTDOWN(不是running),且非(SHUTDOWN 且task为null且阻塞队列不为空)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            //worker数>=CAPACITY 或者worker数>=corePoolSize or maximumPoolSize 直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //cas workercount+1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //2、创建worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //1、new Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //1.1、这里需要加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //当获取锁后再次检查 线程池状态
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //工作线程已经是startable
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //2、工作线程还未启动,加入workers(HashSet)
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        //2.2、实时修改largestPoolSize = s
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动工作线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //worder线程启动失败后的一些措施
            //1、从workers删除worder
            //2、worderCount递减
            //3、尝试终止线程池
            addWorkerFailed(w);
    }
    return workerStarted;
}

第一个for循环,自旋CAS加1的过程中判断线程数是否满足或者,加1成功结束外围循环,也有可能在CAS的过程中改变导致失败则重新判断、自旋重试。

第二个for循环,创建工作线程,并启动,启动的过程中需要加锁,因为启动完Worker,还需要将其加入到中,并将赋值给;如果启动失败,则需要一些措施:

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            //1、从workers删除worder
            workers.remove(w);
        //2、worderCount cas自旋递减
        decrementWorkerCount();
        //3、如果线程池正处于销毁的过程,则尝试自旋终止线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

四、Worker类是一把锁也是一个线程

中创建的工作线程是对象,它是的内部类,继承于,相当于是一把锁,实现了接口,又是一个线程。

有两个核心的成员变量和。因为本身是一个线程,初始化时会被包装创建为一个赋值给,方法中直接调用启动线程;是提交进来的任务,直接调用的run()函数,执行任务代码。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    //Worker是不可能被序列化的,但是为了禁止javac告警而提供serialVersionUID
    private static final long serialVersionUID = 6138294804551838833L;
    //这个线程非常的重要 Worker就是运行在其中的
    final Thread thread;
    //任务线程,并不会当做线程去start而是直接调用run()
    Runnable firstTask;
    //数据统计,可以忽略
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        //默认ThreadFactory,创建工作线程
        this.thread = getThreadFactory().newThread(this);
    }
}

1、runWorker执行任务

线程启动后会自动条用其函数:

当刚start时,直接拿的成员变量来执行任务代码,当为空时,从工作队列中循环取出执行,而工作队列为空则阻塞等待。

在代码中提供了两个空函数和,一般称之为钩子函数,用户可以自行继承时,将其重写,加上一些业务逻辑。

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //这里是一个空函数,使用者可以自行继承ThreadPoolExecutor 并重写beforeExecute
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //直接调用了run函数
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //这也是一个空函数,使用者可以自行继承ThreadPoolExecutor 并重写afterExecute
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //如果从阻塞队列取不到任务,将会让当前的工作线程退出
            processWorkerExit(w, completedAbruptly);
        }
}

源码中显而易见,直接调用的是任务的,并且捕获了异常,却只是上抛并没有处理。如果提交的任务方法中有异常,用户没有自行捕获,捕获了,则会结束循环,并会将当前线程销毁,影响线程池的正常使用,所以强制要求任务方法中任务代码,防患于未然。

是一把锁,在循环执行任务时会上锁,其目的是确保线程,除了线程池销毁导致中断外,没有其他中断的设置。在调用关闭线程池时,会中断空闲的工作线程,就是通过遍历,判断是否能获取线程的锁,而定义线程是否为空闲。源码英文注释是这样描述的:

Before running any task, the lock is acquired to prevent other pool interrupts

while the task is executing, and then we ensure that unless pool is stopping,

this thread does not have its interrupt set.

2、getTask()决定Worker的生死命运

是从中获取任务,涉及了如何从队列中取任务,以及决定了工作线程的生死命运。

返回null是一件非常恐怖的事情,因为会因为返回null而结束循环,从而执行销毁退出线程的逻辑,也就是回收线程。返回null的情况:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //大于maximumPoolSize时,将不从阻塞队列中取
        //大于corePoolSize 或者 allowCoreThreadTimeOut 为true,且超时了不从阻塞队列中取
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //通过timed判断是用poll还是take,
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

通过判断如何从队列中取任务:

一般情况下,只有超过corePoolSize的线程具有超时效果,受keepAliveTime的支配。如果使用者将设置为true,则所有的工作线程都会因为空闲超时而被回收。

3、processWorkerExit因runWorker循环结束而回收Worker

因循环结束而回收,主要做了如下几件事:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        //从workers删除worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //重试终止,如果线程池状态为running 直接返回
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        //stop以内,completedAbruptly=false
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //allowCoreThreadTimeOut=true,且没有正在运行的工作线程
        //或者allowCoreThreadTimeOut=false,工作线程数没有达到corePoolSize
        //则创建一个空工作线程作为弥补。
        addWorker(null, false);
    }
}

中局部变量首先设置为true,循环结束则设置为false,但是若是因为任务代码异常,则不会执行到,所以不会再销毁的当前的时候再启动一个空弥补上。

什么时候会再启动一个空呢?当线程池的状态小于STOP,也就是RUNNING或者SHUTDOWN,且不是因为任务代码异常结束循环时,继续判断是否需要启动一个空线程作为弥补:

五、总结

PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!