Bootstrap

【高并发】从源码角度分析创建线程池究竟有哪些方式

大家好,我是冰河~~

在Java的高并发领域,线程池一直是一个绕不开的话题。有些童鞋一直在使用线程池,但是,对于如何创建线程池仅仅停留在使用Executors工具类的方式,那么,创建线程池究竟存在哪几种方式呢?就让我们一起从创建线程池的源码来深入分析究竟有哪些方式可以创建线程池。

使用Executors工具类创建线程池

在创建线程池时,初学者用的最多的就是Executors 这个工具类,而使用这个工具类创建线程池时非常简单的,不需要关注太多的线程池细节,只需要传入必要的参数即可。Executors 工具类提供了几种创建线程池的方法,如下所示。

  • Executors.newCachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程

  • Executors.newFixedThreadPool:创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待

  • Executors.newScheduledThreadPool:创建一个定长的线程池,支持定时、周期性的任务执行

  • Executors.newSingleThreadExecutor: 创建一个单线程化的线程池,使用一个唯一的工作线程执行任务,保证所有任务按照指定顺序(先入先出或者优先级)执行

  • Executors.newSingleThreadScheduledExecutor:创建一个单线程化的线程池,支持定时、周期性的任务执行

  • Executors.newWorkStealingPool:创建一个具有并行级别的work-stealing线程池

其中,Executors.newWorkStealingPool方法是Java 8中新增的创建线程池的方法,它能够为线程池设置并行级别,具有更高的并发度和性能。除了此方法外,其他创建线程池的方法本质上调用的是ThreadPoolExecutor类的构造方法。

例如,我们可以使用如下代码创建线程池。

Executors.newWorkStealingPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool(3);

使用ThreadPoolExecutor类创建线程池

从代码结构上看ThreadPoolExecutor类继承自AbstractExecutorService,也就是说,ThreadPoolExecutor类具有AbstractExecutorService类的全部功能。

既然Executors工具类中创建线程池大部分调用的都是ThreadPoolExecutor类的构造方法,所以,我们也可以直接调用ThreadPoolExecutor类的构造方法来创建线程池,而不再使用Executors工具类。接下来,我们一起看下ThreadPoolExecutor类的构造方法。

ThreadPoolExecutor类中的所有构造方法如下所示。

public ThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
           BlockingQueue workQueue) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue workQueue,
              ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
   threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
              TimeUnit unit,
        BlockingQueue workQueue,
        RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
   Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
              BlockingQueue workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
  if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.acc = System.getSecurityManager() == null ?
      null :
      AccessController.getContext();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}

由ThreadPoolExecutor类的构造方法的源代码可知,创建线程池最终调用的构造方法如下。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit,
        BlockingQueue workQueue,
        ThreadFactory threadFactory,
              RejectedExecutionHandler handler) {
  if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.acc = System.getSecurityManager() == null ?
      null :
      AccessController.getContext();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}

关于此构造方法中各参数的含义和作用,各位可以移步《》进行查阅。

大家可以自行调用ThreadPoolExecutor类的构造方法来创建线程池。例如,我们可以使用如下形式创建线程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue());

使用ForkJoinPool类创建线程池

在Java8的Executors工具类中,新增了如下创建线程池的方式。

public static ExecutorService newWorkStealingPool(int parallelism) {
  return new ForkJoinPool
    (parallelism,
     ForkJoinPool.defaultForkJoinWorkerThreadFactory,
     null, true);
}

public static ExecutorService newWorkStealingPool() {
  return new ForkJoinPool
    (Runtime.getRuntime().availableProcessors(),
     ForkJoinPool.defaultForkJoinWorkerThreadFactory,
     null, true);
}

从源代码可以可以,本质上调用的是ForkJoinPool类的构造方法类创建线程池,而从代码结构上来看ForkJoinPool类继承自AbstractExecutorService抽象类。接下来,我们看下ForkJoinPool类的构造方法。

public ForkJoinPool() {
  this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
     defaultForkJoinWorkerThreadFactory, null, false);
}
 public ForkJoinPool(int parallelism) {
  this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism,
        ForkJoinWorkerThreadFactory factory,
        UncaughtExceptionHandler handler,
        boolean asyncMode) {
  this(checkParallelism(parallelism),
     checkFactory(factory),
     handler,
     asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
     "ForkJoinPool-" + nextPoolId() + "-worker-");
  checkPermission();
}

private ForkJoinPool(int parallelism,
         ForkJoinWorkerThreadFactory factory,
         UncaughtExceptionHandler handler,
         int mode,
         String workerNamePrefix) {
  this.workerNamePrefix = workerNamePrefix;
  this.factory = factory;
  this.ueh = handler;
  this.config = (parallelism & SMASK) | mode;
  long np = (long)(-parallelism); // offset ctl counts
  this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

通过查看源代码得知,ForkJoinPool的构造方法,最终调用的是如下私有构造方法。

private ForkJoinPool(int parallelism,
         ForkJoinWorkerThreadFactory factory,
         UncaughtExceptionHandler handler,
         int mode,
         String workerNamePrefix) {
  this.workerNamePrefix = workerNamePrefix;
  this.factory = factory;
  this.ueh = handler;
  this.config = (parallelism & SMASK) | mode;
  long np = (long)(-parallelism); // offset ctl counts
  this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

其中,各参数的含义如下所示。

  • parallelism:并发级别。

  • factory:创建线程的工厂类对象。

  • handler:当线程池中的线程抛出未捕获的异常时,统一使用UncaughtExceptionHandler对象处理。

  • mode:取值为FIFO_QUEUE或者LIFO_QUEUE。

  • workerNamePrefix:执行任务的线程名称的前缀。

当然,私有构造方法虽然是参数最多的一个方法,但是其不会直接对外方法,我们可以使用如下方式创建线程池。

new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

使用ScheduledThreadPoolExecutor类创建线程池

在Executors工具类中存在如下方法类创建线程池。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
  return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1, threadFactory));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

从源码来看,这几个方法本质上调用的都是ScheduledThreadPoolExecutor类的构造方法,ScheduledThreadPoolExecutor中存在的构造方法如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), threadFactory, handler);
}

而从代码结构上看,ScheduledThreadPoolExecutor类继承自ThreadPoolExecutor类,本质上还是调用ThreadPoolExecutor类的构造方法,只不过此时传递的队列为DelayedWorkQueue。我们可以直接调用ScheduledThreadPoolExecutor类的构造方法来创建线程池,例如以如下形式创建线程池。

new ScheduledThreadPoolExecutor(3)

好了,今天就到这儿吧,我是冰河,我们下期见~~