Bootstrap

过滤Spark数据集的四种方法

在实际工作中,根据某个字段,对一个Spark数据集进行过滤,是一个很常见的场景,举个例子:

一个存储公司员工信息的数据集A,有以下三个字段:

id: Integer
name: String
age: Integer

现在要过滤出某些员工的id,这些id在B集合(B可能是哈希表,也可能是Spark数据集)中,过滤逻辑为:

C = A.filter(A.id in B)

有四种方法可以实现,分别为:

  • Filter

  • Map

  • MapPartition

  • Inner Join

下面详细介绍。

Filter

Spark的Filter变换,可以根据条件表达式、返回布尔值的过滤函数、条件字符串,对数据集进行过滤,使用方法如下:

// 1. 条件表达式
A1 = A.filter(Column condition)
// 2. 自定义过滤函数
A1 = A.filter(FilterFunction func)
// 3. 条件字符串
A1 = A.filter(String condition)

Filter 变换比较简单,逐条处理记录不论数据集大小,效率都很高,但需要能够将用来过滤的数据集B广播到所有的executor上。

Map

Map变换,对数据集中每条记录调用一个函数,返回值可以是null,也可以是相同类型或不同类型的新记录,使用方法如下:

// encoder参数用来指定输出类型
A2 = A.map(MapFunction func, Encoder encoder)

通过Map变换实现过滤的话,只需要在Map变换中,将符合条件的记录原样返回,不符合条件的记录返回null即可。

可以看到,Map变换的语义和Filter变换的语义相似,都是逐条处理记录,但Map需要提供一个额外的Encoder,故没有Filter简单和优雅,且因为输出要过滤null值,所以效率不如Filter。

MapPartitions

MapPartitions变换,与Map变换类似,但映射函数不是在每条记录上调用,而是在分区级别调用,使用方法如下:

// func的输入和输出都是Iterator类型
A3 = A.map(MapPartitionsFunction func, Encoder encoder)

MapPartitions在分区级别进行操作,而不是记录级别,因此比Filter和Map效率更高。缺点的话,首先和Map一样,需要提供一个额外的Encoder,此外,当分区过大,超过executor所能提供的内存时,任务会失败,因此可靠性不如Map和Filter。

Inner Join

以员工id相等为Inner Join的条件,然后只要A集合中的字段,同样可以实现过滤,使用方法:

// join表达式可能为 A("id") === B("id")
A4 = A.join(Dataset B, Column joinExprs)

Inner Join和Filter一样,效率和可靠性都有保证,且对B集合的类型和大小都没有偏好。

总结

在本文描述的过滤场景中,综合考虑效率和可靠性,如果用来过滤的集合比较小,可以广播到所有的executor中,那么选择Filter变换为佳,如果B集合很大,则Inner Join更合适。

参考链接:

公众号:大数志

传递最新、最有价值的大数据技术干货和资讯。