Bootstrap

结语:Apache Spark 3_0(十二)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与AI相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI等相关文章,目前已经输出了40万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。

在我们写这本书的时候,Apache Spark 3.0还没有正式发布,它仍在开发中,我们开始使用Spark 3.0.0- preview2。本书中的所有代码示例都在Spark 3.0.0- preview2上进行过测试,它们与Spark 3.0官方版本的工作原理没有什么不同。在相关的章节中,我们尽可能地提到了什么时候特性是新增加的,它在Spark 3.0中有什么特点。在本章中,我们概述了这些变化。

缺陷修复和功能增强有很多,因此,为了简洁起见,我们仅重点介绍与Spark组件有关的一些显著更改和功能。某些新功能实际上是高级功能,不在本书的讨论范围之内,但是我们在这里提到这些功能是为了让你在发行版可用时可以探索它们。

Spark Core和Spark SQL

首先,让我们考虑一下幕后的新功能。Spark Core和Spark SQL引擎中引入了许多更改,以帮助加快查询速度。加快查询的一种方法是使用动态分区修剪来读取较少的数据。另一个是在执行过程中调整和优化查询计划。

动态分区修剪

动态分区修剪(DPP)背后的想法是跳过查询结果中不需要的数据。DPP最佳的典型方案是连接两个表:事实表(分区在多列上)和维度表(未分区),如图12-1所示。通常,过滤器位于表的非分区侧(在本例中为Date)。例如,考虑这个基于两个表Sales和Date的查询:

-- In SQL

SELECT * FROM Sales JOIN Date ON Sales.date = Date.date

DPP中的关键优化技术是从维度表中获取过滤器的结果,并将其注入到事实表中,作为扫描操作的一部分,以限制读取的数据,如图12-1所示。

考虑维表小于事实表并执行连接的情况,如图12-2所示。在这种情况下,Spark很可能会进行广播连接(在第7章中进行了讨论)。在此连接期间,Spark将执行以下步骤以最大程度地减少从较大的事实表中扫描的数据量:

1. 在连接的维度方面,Spark将根据该维度表构建一个哈希表(也称为构建关系),作为此过滤器查询的一部分。

2. Spark会将查询结果插入哈希表,并将其分配给广播变量,该变量将分发给参与此连接操作的所有执行程序。

3. 在每个执行程序上,Spark都会探测广播的哈希表,以确定要从事实表中读取哪些对应的行。

4. 最后,Spark将把此过滤器动态注入事实表的文件扫描操作中,并重用广播变量的结果。这样,作为对事实表的文件扫描操作的一部分,仅扫描与过滤器匹配的分区,并且仅读取所需的数据。

默认情况下启用,因此你不必显式配置它,当在两个表之间执行连接时,所有这些都是动态发生的。通过DPP优化,Spark 3.0可以更好地处理星型模式查询。

自适应查询执行

Spark 3.0优化查询性能的另一种方法是在运行时调整其物理执行计划。自适应查询执行(AQE)根据查询执行过程中收集的运行时统计信息重新优化和调整查询计划。它尝试在运行时执行以下操作:

通过减少shuffle分区的数量来减少shuffle阶段中的reducer的数量。

优化查询的物理执行计划,例如通过将SortMergeJoin在合适的时间点转换为BroadcastHashJoin。

处理连接期间的数据倾斜。

所有这些自适应措施都在运行时计划执行期间发生,如图12-3所示。要在Spark 3.0中使用AQE,请将配置设置spark.sql.adaptive.enabled为true。

AQE框架

查询中的Spark操作被流水线化并在并行进程中执行,但是改组或广播交换中断了该流水线,因为需要将一个阶段的输出作为下一阶段的输入(请参阅本章中的“步骤3:了解Spark应用程序概念”)。这些断点在查询阶段称为实现点,它们提供了重新优化和重新检查查询的机会,如图12-4所示。

如图所示,这是AQE框架要迭代的概念性步骤:

1. 执行每个阶段的所有叶节点,例如扫描操作。

2. 实现点完成执行后,将其标记为完成,并且在执行期间获得的所有相关统计信息都会在其逻辑计划中进行更新。

3. 基于这些统计信息,例如读取的分区数,读取的数据字节等,该框架再次运行Catalyst优化器以了解它是否可以:

a. 合并分区的数量以减少用于读取随机数据的缩减器的数量。

b. 根据读取的表的大小,用广播连接替换排序合并连接。

c. 尝试纠正数据倾斜。

d. 创建一个新的优化逻辑计划,然后创建一个新的优化物理计划。

 

重复此过程,直到执行了查询计划的所有阶段。

简而言之,这种重新优化是动态完成的,如图12-3所示,其目标是动态合并shuffle分区,减少读取shuffle输出数据所需的reducer数量,在适当的情况下切换连接策略,并补救任何倾斜连接。

两种Spark SQL配置指示AQE如何减少reducer的数量:

l spark.sql.adaptive.coalescePartitions.enabled(设置为true)

l spark.sql.adaptive.skewJoin.enabled(设置为true)

在撰写本文时,Spark 3.0社区博客,文档和示例尚未公开发布,但在发布时它们应该已经公开。如果你希望了解这些功能的工作原理,这些资源将使你获得更多详细信息,包括有关如何注入SQL连接提示的信息,下面将进行讨论。

SQL连接提示

添加到现有的BROADCAST提示进行连接,在Spark3.0中为所有的Spark Join连接策略增加了连接提示(见第七章的“Spark的Join连接策略”)。此处提供了每种连接类型的示例。

随机排序合并连接(SMJ)

有了这些新的提示,你可以建议Spark在连接表a和b或customers和orders 时执行SortMergeJoin ,如以下示例所示。你可以在SELECT /*+ ... */注释块内的语句中添加一个或多个提示:

SELECT /*+ MERGE(a, b) */ id FROM a JOIN b ON a.key = b.key

SELECT /*+ MERGE(customers, orders) */ * FROM customers, orders WHERE

orders.custId = customers.custId

广播哈希连接(BHJ)

同样,对于广播哈希连接,你可以向Spark提供提示,表明你更喜欢广播连接。例如,在这里,我们广播表a与表b连接以及表customers与表orders连接:

SELECT /*+ BROADCAST(a) */ id FROM a JOIN b ON a.key = b.key

SELECT /*+ BROADCAST(customers) */ * FROM customers, orders WHERE

orders.custId = customers.custId 

Shuffle哈希连接(SHJ)

你可以通过类似的方式提供提示以执行随机哈希连接,尽管与前两种受支持的连接策略相比,这种情况很少见:

SELECT /*+ SHUFFLE_HASH(a, b) */ id FROM a JOIN b ON a.key = b.key

SELECT /*+ SHUFFLE_HASH(customers, orders) */ * FROM customers, orders WHERE  orders.custId = customers.custId

随机复制嵌套循环连接(SNLJ)

最后,shuffle和复制嵌套循环连接遵循类似的形式和语法:

SELECT /*+ SHUFFLE_REPLICATE_NL(a, b) */ id FROM a JOIN b

目录插件API和DataSourceV2

Spark 3.0的实验性DataSourceV2 API不仅限于Hive元存储和目录,还扩展了Spark生态系统并为开发人员提供了三个核心功能。具体来说包括:

l 允许插入用于目录和表管理的外部数据源

l 通过ORC,Parquet,Kafka,Cassandra,Delta Lake和Apache Iceberg等受支持的文件格式,支持将谓词下推到其他数据源。

l 提供统一的API,用于接收和来源的数据源的流式处理和批处理

针对希望扩展Spark使用外部源和接收器功能的开发人员,目录API提供了SQL和编程API来从指定的可插入目录中创建,更改,加载和删除表。该目录提供了在不同级别执行的功能和操作的分层抽象,如图12-5所示。

Spark和特定连接器之间的初始交互是解析与其实际Table对象的关系。Catalog定义如何在此连接器中查找表。此外,Catalog可以定义如何修改自己的元数据,从而支持CREATE TABLE,ALTER TABLE等操作。

例如,在SQL中,你现在可以发出命令来为目录创建命名空间。要使用可插入目录,请在spark-defaults.conf文件中启用以下配置:

spark.sql.catalog.ndb_catalog com.ndb.ConnectorImpl # connector implementation

spark.sql.catalog.ndb_catalog.option1 value1

spark.sql.catalog.ndb_catalog.option2 value2 

在此,数据源目录的连接器有两个选项:option1->value1和option2->value2。定义它们之后,Spark或SQL中的应用程序用户可以使用DataFrameReader和DataFrameWriter API方法或带有这些已定义选项的Spark SQL命令作为数据源操作的方法。例如:

-- In SQL

SHOW TABLES ndb_catalog;

CREATE TABLE ndb_catalog.table_1;

SELECT * from ndb_catalog.table_1;

ALTER TABLE ndb_catalog.table_1

 

// In Scala

df.writeTo("ndb_catalog.table_1")

val dfNBD = spark.read.table("ndb_catalog.table_1")

  .option("option1", "value1")

  .option("option2", "value2")

这些目录插件API扩展了Spark利用外部数据源作为接收器和源的能力,但它们仍处于试验阶段,不应在生产中使用。有关使用它们的详细指南超出了本书的范围,但是如果你想将自定义连接器写入外部数据源作为目录来管理外部表及其相关联的目录,我们建议你查看发行文档以获取更多信息。元数据。

 

前面的代码段是定义和实现目录连接器并用数据填充它们后代码的示例。

加速器感知调度器(Accelerator-Aware Scheduler)

Project Hydrogen是一项将AI和大数据整合在一起的社区计划,其主要目标是三个:实施障碍执行模式,感知加速器的计划以及优化的数据交换。Apache Spark 2.4.0中引入了屏障执行模式的基本实现。在Spark 3.0中,已实现了基本的调度程序,以利用硬件加速器(例如目标平台上的GPU),在该平台上,Spark以独立模式,YARN或Kubernetes部署。

为了让Spark以有组织的方式利用这些GPU来处理使用它们的特殊工作负载,你必须指定可通过配置使用的硬件资源。然后,你的应用程序可以在探测脚本的帮助下发现它们。在你的Spark应用程序中,启用GPU使用是一个三步过程:

编写探测脚本,以发现每个Spark执行程序上可用的基础GPU的地址。该脚本在以下Spark配置中设置:

spark.worker.resource.gpu.discoveryScript =/path/to/script.sh

为你的Spark执行者设置配置以使用以下发现的GPU:

spark.executor.resource.gpu.amount = 2

spark.task.resource.gpu.amount = 1

编写RDD代码以利用这些GPU来完成任务:

import org.apache.spark.BarrierTaskContext

val rdd = ...

rdd.barrier.mapPartitions { it =>

  val context = BarrierTaskContext.getcontext.barrier()

  val gpus = context.resources().get("gpu").get.addresses

  // launch external process that leverages GPU

  launchProcess(gpus)

}

 

这些步骤仍处于试验阶段,将来的Spark 3.x版本中将继续进行进一步开发,以支持在命令行(带有spark-submit)和Spark任务级别上无缝发现GPU资源。

结构化流

为了检查你的结构化流作业在执行过程中的数据起伏与变化,Spark 3.0 UI在第7章中探讨了其他选项卡的同时,还提供了一个新的结构化流选项卡。此选项卡提供两组统计信息:有关已完成的流查询作业的聚合信息(图12-6)和有关流查询的详细统计信息,包括输入速率,处理速率,输入行数,批处理持续时间和操作持续时间(图 12-7)。

在图12-7的屏幕截图是采取Spark3.0.0-preview2; 在最终版本中,你应该在用户界面页面的名称标识符中看到查询名称和ID。

无需配置;所有配置均可直接在Spark 3.0安装中运行,并具有以下默认值:

spark.sql.streaming.ui.enabled=true

spark.sql.streaming.ui.retainedProgressUpdates=100

spark.sql.streaming.ui.retainedQueries=100

PySpark,Pandas UDF和Pandas Function API

Spark 3.0要求pandas 0.23.2版本或更高版本才能使用任何与pandas相关的方法,例如DataFrame.toPandas()或SparkSession.createDataFrame(pandas.DataFrame)。

此外,它需要PyArrow 0.12.1版本或更高使用PyArrow功能,例如pandas_udf(),DataFrame.toPandas()和SparkSession.createDataFrame(pandas.DataFrame)(spark.sql.execution.arrow.enabled配置集到true)。下一节将介绍Pandas UDF中的新功能。

重新设计的带有Python类型提示的Pandas UDF

通过利用Python类型提示重新设计了Spark 3.0中的Pandas UDF 。这使你可以自然地表达UDF,而无需赋值类型。pandas UDF现在更具“ Python风格”,它们本身可以定义UDF应该输入和输出的内容,而不必像在Spark 2.4中那样通过UDF进行指定,例如@pandas_udf("long", PandasUDFType.SCALAR)。

这是一个例子:

Pandas UDFs in Spark 3.0

import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("long")

def pandas_plus_one(v: pd.Series) -> pd.Series:

 return v + 1

这种新格式具有许多优点,例如更容易进行静态分析。你可以采用与以前相同的方法来应用新的UDF:

df = spark.range(3)

df.withColumn(“ plus_one”,pandas_plus_one(“ id”))。show()

 

+ --- + -------- +

| id | plus_one |

+ --- + -------- +

| 0 | 1 |

| 1 | 2 |

| 2 | 3 |

+ --- + -------- +

pandasUDF中的迭代器支持

pandasUDF非常常用于加载模型并为单节点机器学习和深度学习模型执行分布式推理。但是,如果模型很大,那么Pandas UDF要在同一Python工作进程中为每个批次重复加载相同的模型,会产生很高的开销。

在Spark3.0,pandasUDF可以接受pandas.Series的迭代器的或pandas.DataFrame,如下所示:

from typing import Iterator 

@pandas_udf('long')

def pandas_plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:

 return map(lambda s: s + 1, iterator)

df.withColumn("plus_one", pandas_plus_one("id")).show()

+---+--------+

| id|plus_one|

+---+--------+

| 0| 1|

| 1| 2|

| 2| 3|

+---+--------+

有了此支持,你只能加载一次模型,而不是为迭代器中的每个系列加载模型。以下伪代码说明了如何执行此操作:

@pandas_udf(...)

def predict(iterator):

 model = ... # load model

 for features in iterator:

   yield model.predict(features)

新的Pandas Function API

Spark 3.0引入了一些新的Pandas UDF类型,当你想对整个DataFrame而不是按列应用函数时,它们很有用,如第11章中mapInPandas()介绍的那样。这些将一个迭代器作为输入,并输出另一个迭代器:pandas.DataFramepandas.DataFrame。

def pandas_filter(

 iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:

 for pdf in iterator:

 yield pdf[pdf.id == 1]

df.mapInPandas(pandas_filter, schema=df.schema).show()

+---+

| id|

+---+

| 1|

+---+

你可以通过在配置中指定spark.sql.execution.arrow.maxRecordsPerBatch来控制pandas.DataFrame的大小。请注意,与大多数Pandas UDF不同,输入大小和输出大小不必匹配。

协同组的所有数据都将加载到内存中,这意味着如果存在数据倾斜或某些组太大而无法容纳在内存中,则可能会遇到OOM问题。

Spark 3.0还引入了cogrouped map Pandas UDF。该applyInPandas()函数使用两个pandas.DataFrame共享一个公用键,并将一个函数应用于每个共同组(cogroup)。然后将返回的pandas.DataFrame组合为单个DataFrame。与mapInPandas()相同,pandas.DataFrame对返回的长度没有限制。这是一个例子:

df1 = spark.createDataFrame(

 [(1201, 1, 1.0), (1201, 2, 2.0), (1202, 1, 3.0), (1202, 2, 4.0)],

 ("time", "id", "v1"))

df2 = spark.createDataFrame(

 [(1201, 1, "x"), (1201, 2, "y")], ("time", "id", "v2"))

def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:

 return pd.merge_asof(left, right, on="time", by="id")

df1.groupby("id").cogroup(

 df2.groupby("id")

).applyInPandas(asof_join, "time int, id int, v1 double, v2 string").show()

 

 

+----+---+---+---+

|time| id| v1| v2|

+----+---+---+---+

|1201| 1|1.0| x|

|1202| 1|3.0| x|

|1201| 2|2.0| y|

|1202| 2|4.0| y|

+----+---+---+---+

功能变更

列出Spark 3.0中的所有功能更改会把这本书转变成几英寸厚的砖头。因此,为了简洁起见,我们将在此处提及一些值得注意的问题,并让你查阅Spark 3.0的发行说明,以获取完整的详细信息和所有细微差别。

支持和弃用的语言

Spark 3.0支持Python 3和JDK 11,并且需要Scala 2.12版本。不推荐使用Python 3.6和Java 8之前的所有版本。如果使用这些不建议使用的版本,则会收到警告消息。

对DataFrame和Dataset API的更改

在早期版本的Spark中,Dataset和DataFrame AP已弃用该unionAll()方法。在Spark 3.0中,这已被逆转,并且unionAll()现在是union()方法的别名。

此外,Spark的Dataset. groupbykey()的早期版本会导致一个分组的数据集,当键是非结构类型(int、string、array等)时,键被虚假地命名为值。因此,当显示时,查询中的ds.groupByKey().count()的聚合结果与(value, count)相反。这已被纠正,以产生(key,count),这更直观。例如:

//  In Scala

val ds = spark.createDataset(Seq(20, 3, 3, 2, 4, 8, 1, 1, 3))

ds.show(5)

 

+-----+

|value|

+-----+

|   20|

|    3|

|    3|

|    2|

|    4|

+-----+

 

ds.groupByKey(k=> k).count.show(5)

 

+---+--------+

|key|count(1)|

+---+--------+

|  1|       2|

|  3|       3|

| 20|       1|

|  4|       1|

|  8|       1|

+---+--------+

only showing top 5 rows

但是,如果愿意,可以通过设置spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue为true保留旧格式。

DataFrame和SQL Explain命令

为了获得更好的可读性和格式,Spark 3.0引入了DataFrame.explain(FORMAT_MODE)功能,用了显示Catalyst优化器生成的计划的不同视图。该选项包括simple(默认),"extended", "cost", "codegen"和" formatting "。这是一个简单的例子:

// In Scala

val strings = spark

 .read.text("/databricks-datasets/learning-spark-v2/SPARK_README.md")

val filtered = strings.filter($"value".contains("Spark"))

filtered.count()

In Python

strings = spark

 .read.text("/databricks-datasets/learning-spark-v2/SPARK_README.md")

filtered = strings.filter(strings.value.contains("Spark"))

filtered.count()

// In Scala

filtered.explain("simple")

In Python

filtered.explain(mode="simple")

== Physical Plan ==

*(1) Project [value#72]

+- *(1) Filter (isnotnull(value#72) AND Contains(value#72, Spark))

   +- FileScan text [value#72] Batched: false, DataFilters: [isnotnull(value#72),

Contains(value#72, Spark)], Format: Text, Location:

InMemoryFileIndex[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md],PartitionFilters: [], PushedFilters: [IsNotNull(value),StringContains(value,Spark)], ReadSchema: struct

// In Scala

filtered.explain("formatted")

 In Python

filtered.explain(mode="formatted")

 

== Physical Plan ==

* Project (3)

+- * Filter (2)

   +- Scan text  (1)

(1) Scan text

Output [1]: [value#72]

Batched: false

Location: InMemoryFileIndex [dbfs:/databricks-datasets/learning-spark-v2/...

PushedFilters: [IsNotNull(value), StringContains(value,Spark)]

ReadSchema: struct

(2) Filter [codegen id : 1]

Input [1]: [value#72]

Condition : (isnotnull(value#72) AND Contains(value#72, Spark))

(3) Project [codegen id : 1]

Output [1]: [value#72]

Input [1]: [value#72]

-- In SQL

EXPLAIN FORMATTED

SELECT *

FROM tmp_spark_readme

WHERE value like "%Spark%"

== Physical Plan ==

* Project (3)

+- * Filter (2)

   +- Scan text  (1)

(1) Scan text

Output [1]: [value#2016]

Batched: false

Location: InMemoryFileIndex [dbfs:/databricks-datasets/

learning-spark-v2/SPARK_README.md]

PushedFilters: [IsNotNull(value), StringContains(value,Spark)]

ReadSchema: struct

(2) Filter [codegen id : 1]

Input [1]: [value#2016]

Condition : (isnotnull(value#2016) AND Contains(value#2016, Spark))

(3) Project [codegen id : 1]

Output [1]: [value#2016]

Input [1]: [value#2016]

要查看其余的格式化模式,请在本书的GitHub repo中笔记本进行尝试使用。也可以查看从Spark 2.x到Spark 3.0的迁移指南。

概括

本章粗略介绍了Spark 3.0中的新功能。我们冒昧地提到了一些值得注意的高级功能。它们在后台运行,而不是在API级别运行。特别是,我们研究了动态分区修剪(DPP)和自适应查询执行(AQE),这两种优化可以提高Spark在执行时的性能。我们还探索了实验性Catalog API如何将Spark生态系统扩展到用于批处理和流数据的源和接收器的自定义数据存储,并研究了Spark 3.0中新的调度程序,使其能够在执行程序中利用GPU。

 

作为第7章中对Spark UI的讨论的补充,我们还向你展示了新的“结构化流”选项卡,它提供了有关流作业的累积统计信息,其他可视化效果以及每个查询的详细指标。

 

在Spark 3.0中不推荐使用低于3.6的Python版本,并且对Pandas UDF进行了重新设计,以支持Python类型的提示和迭代器作为参数。有pandas UDF,它们可以转换整个DataFrame,以及将两个共同分组的DataFrame组合成一个新的DataFrame。

 

为了提高查询计划的可读性,通过DataFrame.explain(FORMAT_MODE)和在SQL中执行EXPLAIN FORMAT_MODESQL显示逻辑和物理计划的不同级别和详细信息。此外,SQL命令现在可以获取Spark整个受支持的连接策略的提示。

 

虽然我们无法在这一简短的章节中列举最新版本的Spark中的所有更改,但我们还是建议你在发布Spark 3.0时浏览发行说明以了解更多信息。另外,为了快速总结面向用户的更改以及有关如何迁移到Spark 3.0的详细信息,我们建议你查看迁移指南。

提醒一下,本书中的所有代码均已在Spark 3.0.0-preview2上进行了测试,并且在Spark 3.0正式发布时可以与Spark 3.0一起使用。我们希望你喜欢这本书,并从与我们的旅程中学到东西。感谢你的关注!