Bootstrap

Java并发编程系列——Fork-Join

上次回顾提到要写下线程池,这篇呢还是先放一下,再继续一下线程使用中的一些相关概念,所以本篇来通过一个例子介绍Java中的Fork-Join,当然例子中实现用到了ForkJoinPool,也是线程池的一种。

首先,我们提出一个要解决的问题,而且经过分析后,认为该例子用多线程实现是比较适合的。

问题:假设在当前的系统中有这样一个业务,有一个树形的工程结构,深度为三层,第一层为根节点,类型为Project,第二层为SubProject,最底层为Item。规则上Project只有一个,Project下可以包含若干的SubProject,而SubProject下有若干的Item。该结构有个重要的功能,是通过一层层的计算来确定工程的费用,每一层有自己的费用,而上层的费用则是下层费用的汇总加上该层自身的费用。

有了这个问题我们就来分析下题目如何解决。从规则上看,这个问题本身是诸多独立计算的汇总,也就是说可以把当前的一个大问题分解为若干的相同规则的小问题,而且彼此独立,而且在单线程下整个计算相当耗时(如果发现要解决的问题在多线程下完全没有优势,比如时间上的优势,程序结构上的优势等,则实现多线程的意义不大),这样分析下来,是符合使用Fork-Join的方式来实现的。

既然使用Fork-Join来实现,我们就先介绍下实现Fork-Join的一般形式:

class MyTask extens RecursiveTask/RecursiveAction/ForkJoinTask{
  compute() {
    if(condition) {
      doCompute;
      return result;
    } else {
      tasks = doSplitTask();
      invokeAll(tasks)
      return tasks.foreach.join;
    }
  }
}

ForkJoinPool pool;
MyTask task = new MyTask();
pool.invoke(task);
result = task.join();

解释一下这段伪码,首先是定义任务类,任务类继承自如上伪码中的任一个,RecursiveTask与RecursiveAction都继承自ForkJoinTask,通常使用这两个就可以,而这两者的主要区别是RecursiveTask可以带返回值,而RecursiveAction无返回值。实现compute方法,在compute中决定是否要继续拆分,并实现业务逻辑,最后使用ForkJoinPool的invoke来调用初始任务。

接下来我们就分步来实现,先定义我们的树结构

public static class TreeNode {
    public TreeNode(NodeType type, long amount, TreeNode parent) {
        this.type = type;
        this.amount = amount;
        if (parent != null) {
            this.parent = parent;
            this.parent.children.add(this);
        }
    }

    public long calcCost() {
        return this.amount;
    }

    private NodeType type;
    private TreeNode parent;
    private long amount;
    private List children = new ArrayList<>();
}

其中节点类型,我们定义一个枚举来表示

public enum NodeType {
    PROJECT, SUB_PROJECT, ITEM
}

接下来定义任务类

public static class ComputeTask extends RecursiveTask {
    private TreeNode node;

    public ComputeTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Long compute() {
        if (NodeType.SUB_PROJECT.equals(node.type)) {
            for (TreeNode item : this.node.children) {
                this.node.amount += item.calcCost();
            }
        } else {
            List tasks = new ArrayList<>();
            for (TreeNode treeNode : this.node.children) {
                ComputeTask task = new ComputeTask(treeNode);
                tasks.add(task);
            }
            invokeAll(tasks);
            for (ComputeTask task : tasks) {
                this.node.amount += task.join();
            }
        }
        return this.node.calcCost();
    }
}

假设我们任务拆分规则为仅拆分到SubProject这一层,避免任务拆分过于细碎。

定义一个初始化方法,用来构造一个要计算的工程树。

private static TreeNode constructProject() {
    TreeNode project = new TreeNode(NodeType.PROJECT, 0, null);
    for (int i = 0; i < 20; i++) {
        TreeNode subProject = new TreeNode(NodeType.SUB_PROJECT, 0, project);
        for (int j = 0; j < 5000; j++) {
            new TreeNode(NodeType.ITEM, j, subProject);
        }
    }
    return project;
}

最后是如何调用并输出结果。

public static void main(String[] args) {
    TreeNode project = constructProject();
    ForkJoinPool pool = new ForkJoinPool();
    ComputeTask task = new ComputeTask(project);
    pool.invoke(task);
    System.out.println(task.join());
}

这样一个工程树的计算就完成了。

这里仅举个例子来说明Fork-Join的用法,在实际使用过程中需要注意,invoke方式因为内部有执行join,所以可以说是一种同步的方式,而如果使用execute方法,则相当于异步的方式,在使用中需要注意应用场景。比如该示例中,最后如果不使用task.join()而直接获取project.amout,同样可以得到结果,但改成execute执行任务,而不使用join()获取结果,在同样位置是得不到正确的结果的。

本文仅较简略地介绍Fork-Join的一般使用方式,像Fork-Join内部实现了工作密取,对于这一点,先了解有这样一个机制。

下一篇应该会讲一下线程的一些常用工具类。

欢迎关注公众号“像什么”

本系列其他文章: