优化和调整Spark应用程序(七)
写在前面:
大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与AI相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。
业余时间专注于输出大数据、AI等相关文章,目前已经输出了40万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。
想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。
内推信息
如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。
免费学习资料
如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!
学习交流群
如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。
在上一章中,我们详细介绍了如何在Java和Scala中使用数据集。我们探讨了Spark如何管理内存以将数据集结构作为其统一的高级API的一部分,并考虑了与使用数据集相关的成本以及如何降低这些成本。
除了降低成本外,我们还想考虑如何优化和调整Spark。在本章中,我们将讨论一组启用优化的Spark配置,查看Spark的一系列join策略,并检查Spark用户界面,寻找有关不良行为的线索。
优化和调整Spark效率
虽然Spark有许多可供调优的配置,但这本书将只涵盖少数最重要和通常被调优的配置。要获得按功能主题分组的完整列表,可以阅读官网文档。
查看和设置Apache Spark配置
你可以通过三种方式获取和设置Spark属性。首先是通过一组配置文件。在部署$SPARK_HOME目录(安装Spark的位置)中,有许多配置文件:conf / spark-defaults.conf.template,conf / log4j.properties.template和conf / spark-env.sh.template。更改这些文件中的默认值,保存为不带.template后缀的配置文件。这样一来spark会自动加载修改后的配置文件,使得修改后的值生效。
conf / spark-defaults.conf文件中修改后的配置适用于Spark集群以及所有提交给集群的Spark应用程序。
第二种方法是使用spark-submit命令,在提交应用程序的时候使用--conf 标识符直接在Spark应用程序中或在命令中指定Spark配置,如下所示:
spark-submit --conf spark.sql.shuffle.partitions = 5
--conf “ spark.executor.memory = 2g”
--class main.scala.chapter7.SparkConfig_7_1 jars / main-
scala-chapter7_2.12-1.0.jar
下面是在Spark应用程序本身中调整配置的方法:
// In Scala
import org.apache.spark.sql.SparkSession
def printConfigs(session: SparkSession) = {
// Get conf
val mconf = session.conf.getAll
// Print them
for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
}
def main(args: Array[String]) {
// Create a session
val spark = SparkSession.builder
.config("spark.sql.shuffle.partitions", 5)
.config("spark.executor.memory", "2g")
.master("local[*]")
.appName("SparkConfig")
.getOrCreate()
printConfigs(spark)
spark.conf.set("spark.sql.shuffle.partitions",
spark.sparkContext.defaultParallelism)
println(" ****** Setting Shuffle Partitions to Default Parallelism")
printConfigs(spark)
}
spark.driver.host -> 10.8.154.34
spark.driver.port -> 55243
spark.app.name -> SparkConfig
spark.executor.id -> driver
spark.master -> local[*]
spark.executor.memory -> 2g
spark.app.id -> local-1580162894307
spark.sql.shuffle.partitions -> 5
第三个选项是通过Spark shell的编程接口实现的。与Spark中的所有其他内容一样,API是交互的主要方法。通过SparkSession对象,你可以访问大多数Spark配置。
在Spark REPL中,例如,这个Scala代码显示在本地主机上Spark以本地模式启动的Spark配置(详情上可用的不同的模式,请参阅第一章中的“部署模式”):
// In Scala
// mconf is a Map[String, String]
scala> val mconf = spark.conf.getAll
...
scala> for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
spark.driver.host -> 10.13.200.101
spark.driver.port -> 65204
spark.repl.class.uri -> spark://10.13.200.101:65204/classes
spark.jars ->
spark.repl.class.outputDir -> /private/var/folders/jz/qg062ynx5v39wwmfxmph5nn...
spark.app.name -> Spark shell
spark.submit.pyFiles ->
spark.ui.showConsoleProgress -> true
spark.executor.id -> driver
spark.submit.deployMode -> client
spark.master -> local[*]
spark.home -> /Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7
spark.sql.catalogImplementation -> hive
spark.app.id -> local-1580144503745
你还可以仅查看特定于Spark SQL的Spark配置:
// In Scala
spark.sql("SET -v").select("key", "value").show(5, false)
# In Python
spark.sql("SET -v").select("key", "value").show(n=5, truncate=False)
+------------------------------------------------------------+
|key |value |
+------------------------------------------------------------+
|spark.sql.adaptive.enabled |false |
|spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin |0.2 |
|spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled|true |
|spark.sql.adaptive.shuffle.localShuffleReader.enabled |true |
|spark.sql.adaptive.shuffle.maxNumPostShufflePartitions |
+------------------------------------------------------------+
only showing top 5 rows
另外,你可以通过Spark UI的“Environment”选项卡访问Spark的当前配置,只不过是作为只读值,我们将在本章稍后讨论该选项卡,如图7-1所示。

要以编程方式设置或修改现有配置,请首先检查该属性是否可修改。spark.conf.isModifiable(“
// In Scala
scala> spark.conf.get("spark.sql.shuffle.partitions")
res26: String = 200
scala> spark.conf.set("spark.sql.shuffle.partitions", 5)
scala> spark.conf.get("spark.sql.shuffle.partitions")
res28: String = 5
# In Python
>>> spark.conf.get("spark.sql.shuffle.partitions")
'200'
>>> spark.conf.set("spark.sql.shuffle.partitions", 5)
>>> spark.conf.get("spark.sql.shuffle.partitions")
'5'
在设置Spark属性的所有方式中,优先级顺序决定采用哪些值。优先级最低的是spark-defaults.conf中定义的配置项的值或标志,其次读取spark-submit命令行中传递的配置项的值或标志,最后读取SparkSession在Spark应用程序中通过SparkConf设置的值或标志。总结下来优先级的高低为:配置文件 < spark-submi命令 < 程序配置。最终所有这些属性都会被合并,并且优先在Spark应用程序中重置的所有重复属性。同样,命令行上提供的配置项的值将替换配置文件中对应配置项的设置,前提是这些值不会被应用程序中的相同配置覆盖。
调整或设置正确的配置有助于提高性能,正如你将在下一节中看到的那样。这里的建议来自社区中从业人员的经验,着重于如何最大程度地利用Spark的集群资源以适应大规模工作负载。
扩展Spark以应对高负载
大型Spark工作负载通常是批处理工作,有些工作是每晚执行的,有些则是每天定期执行的。无论哪种情况,这些作业都可能处理数十个TB字节甚至更多的数据。为了避免由于资源匮乏或性能逐渐下降而导致作业失败,可以启用或更改一些Spark配置。这些配置影响三个Spark组件:Spark 驱动程序,Executor和在Executor上运行的shuffle服务。
Spark驱动程序的职责是与集群管理器协调,从而在集群中启动Executor,并在其上调度Spark任务。对于大型工作负载,你可能有数百个任务。本节说明了一些可以调整或启用的配置,以优化资源利用率,并行化任务从而避免大量任务的瓶颈。一些优化想法和见解来自诸如Facebook之类的大数据公司,这些公司以TB的数据规模使用Spark,并在Spark + AI Summit大会上与Spark社区共享了这些优化方式和见解。
静态与动态资源分配
当你像之前一样将计算资源指定为spark-submit命令行参数时,相当于把资源配置写死了。这意味着,如果由于工作负载超出预期而导致以后在驱动程序中排队任务时需要更多资源,Spark将无法容纳或分配额外的资源。
相反,如果你使用Spark的动态资源分配配置,则随着大型工作负载的需求不断增加或减少,Spark驱动程序可以请求更多或更少的计算资源。在工作负载是动态的情况下(即,它们对计算能力的需求各不相同),使用资源动态分配有助于解决突然出现峰值的情况。
一个有用的用例是流,其中数据流量可能不均匀。另一个是按需数据分析,在高峰时段你可能会有大量的SQL查询。启用动态资源分配可以使Spark更好地利用资源,在不使用executor时释放它们,并在需要时获取新的executor。
以及在处理大型或变化的工作负载时,动态分配在多租户环境中也很有用,在该环境中,Spark可以与YARN,Mesos或Kubernetes中的其他应用程序或服务一起部署。但是请注意,Spark不断变化的资源需求可能会同时影响其他需要资源的应用程序。
要启用和配置动态分配,可以使用如下设置。注意这里的数字是任意的;适当的设置将取决于你的工作负载的性质,因此应进行相应的调整。其中一些配置无法在Spark REPL内设置,因此你必须以编程方式进行设置:
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min
默认情况下spark.dynamicAllocation.enabled设置为false。当启用以上显示的设置时,Spark驱动程序将要求集群管理器启动的时候至少创建两个Executor进行初始化(spark.dynamicAllocation.minExecutors--executor最小值)。随着任务队列积压的增加,每次超过积压超时时间(spark.dynamicAllocation.schedulerBacklogTimeout)时,都会请求新的 Executor。在这种情况下,每当有未调度的待处理任务超过1分钟时,驱动程序将请求启动新的Executor以调度积压的任务,最多20个(spark.dynamicAllocation.maxExecutors)。相反,如果 Executor完成一项任务并且空闲了2分钟(spark.dynamicAllocation.executorIdleTimeout),Spark驱动程序将终止该任务。
配置SPARK Executor的内存和shuffle服务
仅启用动态资源分配是不够的。你还必须了解Spark如何配置和使用Executor内存的,以便Executor不会因内存不足而受JVM垃圾回收的困扰。
每个executor可用的内存由spark.executor.memory来控制。如图7-2所示,它分为三个部分:execution memory, storage memory, and reserved memory。在保留300 MB的预留内存之后,默认内存划分为60%的execution memory和40%的storage memory,以防止OOM错误。Spark文档声明此方法适用于大多数情况,但是你可以通过spark.executor.memory参数调整你期望的比例。当不使用存储内存时,Spark可以获取它以供执行内存用于执行目的,反之亦然。

执行内存用于Spark shuffle,join,排序和聚合。由于不同的查询可能需要不同的内存,因此可用内存的比例(spark.mem ory.fraction默认为0.6)可能很难设置一个合适的值,但很容易调整。相比之下,存储内存主要用于缓存用户数据结构和从DataFrame派生的分区。
在map和shuffle操作期间,Spark会写入和读取本地磁盘的shuffle文件,因此I/O活动频繁。这可能会导致瓶颈,因为对于大型Spark作业,默认配置不理想。知道要如何调整不合理的配置可以减轻Spark作业各个阶段的风险。
在表7-1中,我们抓取了一些建议的配置来进行调整,以便这些操作过程中的map、spill和合并过程不受效率低下的I/O所困扰,并使这些操作能够在将最终的shuffle分区写入磁盘之前使用缓冲区内存。调整在每个executor上运行的shuffle服务也有助于提高大型Spark工作负载的整体性能。



该表中的建议并非适用于所有情况,但是它们应该使你了解如何根据工作负载来调整这些配置。与性能调整中的所有其他内容一样,你必须进行尝试,直到找到合适的平衡。
最大化SPARK并行性
Spark的效率很大程度上是因为它能够大规模并行运行多个任务。要了解如何最大程度地提高并行度(即尽可能并行读取和处理数据),你必须研究Spark如何将数据从存储中读取到内存中以及分区对Spark意味着什么。
在数据管理用语中,分区是一种将数据排列成可配置和可读的数据块或磁盘上的连续数据块的方式。这些数据子集可以独立读取或处理,如果有必要,可以通过一个进程中的多个线程并行读取或处理。这种独立性很重要,因为它允许数据处理的大量并行性。
Spark在并行处理任务方面非常高效。正如你在第2章中了解到的那样,对于大规模工作负载,Spark作业将具有多个阶段,并且在每个阶段内将有许多任务。Spark最多会为每个内核的每个任务分配一个线程,并且每个任务将处理一个不同的分区。为了优化资源利用并最大程度地提高并行度,理想的情况是分区至少与Executor上内核的数量一样多,如图7-3所示。如果每个Executor上的分区数量多于内核数量,则所有内核都将保持繁忙状态。你可以将分区视为并行性的基本单位:在单个内核上运行的单个线程可以在单个分区上工作。

如何创建分区
如前所述,Spark的任务将数据从磁盘读取到内存中。磁盘上的数据按块或连续的文件块排列。默认情况下,数据存储上的文件块大小从64 MB到128 MB不等。例如,在HDFS和S3上,默认大小为128 MB(可配置)。这些连续块的集合构成一个分区。
Spark中分区的大小由spark.sql.files.maxPartitionBytes决定。默认值为128 MB。你可以减小大小,但这可能会导致所谓的“小文件问题”,即许多小分区文件,由于文件系统操作,例如,打开,关闭和列出目录,而引入了过多的磁盘I/O和性能下降。在分布式文件系统上可能会很慢。
当你显式使用DataFrame API的某些方法时,也会创建分区。例如,在创建大型DataFrame或从磁盘读取大型文件时,可以显式指示Spark创建一定数量的分区:
// In Scala
val ds = spark.read.textFile("../README.md").repartition(16)
ds: org.apache.spark.sql.Dataset[String] = [value: string]
ds.rdd.getNumPartitions
res5: Int = 16
val numDF = spark.range(1000L * 1000 * 1000).repartition(16)
numDF.rdd.getNumPartitions
numDF: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res12: Int = 16
最后,在shuffle阶段创建shuffle分区。默认情况下,spark.sql.shuffle.partitions中的shuffle分区的数量设置为200 。你可以根据拥有的数据集的大小来调整此数值,以减少通过网络发送给 Executor任务的小分区的数量。
spark.sql.shuffle.partitions对于较小的工作流或流工作负载的默认值太大;你可能希望将其减小到一个较低的值,例如executor上的内核数或更少。
在诸如groupBy()或join()的操作(也称为宽转换,wide transformations)期间创建的shuffle分区会占用网络和磁盘I/O资源。在执行这些操作期间,shuffle会将结果分发到spark.local.directory中指定位置的 Executor的本地磁盘上。使用高性能的SSD磁盘来执行此操作将提高性能。
对于shuffle阶段设置的shuffle分区数量没有通用的计算公式。该数值可能取决于你的用例、数据集、核数和可用的executor内存,这是一种反复试验的方法。
除了为大型工作负载扩展Spark外,要提高性能,你还可以考虑缓存或持久存储经常访问的DataFrames或表。在下一节中,我们将探讨各种缓存和持久性选项。
数据缓存和持久化
缓存和持久化有什么区别?在Spark中,它们是同义词。两个API调用cache()和persist()提供了这些功能。后者可以更好地控制数据的存储方式和位置——在内存和磁盘中(序列化和非序列化)。两者都有助于提高频繁访问的DataFrame或表的性能。
DataFrame.cache()
cache()将在内存允许的范围内存储跨Spark Executor读取的所有分区(见图7-2)。尽管DataFrame可能被部分缓存,但是分区不能被部分缓存(例如,如果你有8个分区,但内存中只能容纳4.5个分区,那么将仅缓存4个)。但是,如果不是所有分区都被缓存,则当你要再次访问数据时,必须重新计算未缓存的分区,这会降低Spark作业的速度。
让我们看一个示例,该示例说明在访问DataFrame时如何缓存大型DataFrame可以提高性能:
// In Scala
// Create a DataFrame with 10M records
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.cache() // Cache the data
df.count() // Materialize the cache
res3: Long = 10000000
Command took 5.11 seconds
df.count() // Now get it from the cache
res4: Long = 10000000
Command took 0.44 seconds
第一个count()实例化了缓存,而第二个count()访问了缓存,从而使该数据集的访问时间快了近12倍。
当你使用cache()或时persist(),直到你调用遍历每条记录的操作(例如count()),DataFrame才会被完全缓存。如果你使用类似的操作take(1),则只有一个分区将被缓存,因为Catalyst意识到你不必为了检索一条记录而计算所有分区。
观察DataFrame如何跨本地主机上的一个executor存储,如图7-4所示,我们可以看到它们都完全放在了内存中(记住较低级别的DataFrame由RDD支持)。

DataFrame.persist()
persist(StorageLevel.LEVEL)具有细微差别,可通过StorageLevel来控制如何缓存数据的级别。表7-2总结了不同的存储级别。磁盘上的数据总是使用Java或Kryo序列化进行序列化。


每个StorageLevel(除外OFF_HEAP)都有一个等价的LEVEL_NAME_2,这意味着在两个不同的Spark Executor上重复两次:MEMORY_ONLY_2,MEMORY_AND_DISK_SER_2等。虽然此选项使用成本很高,但它允许在两个地方进行数据局部化,从而提供了容错能力,并让Spark可以选择将任务调度到数据副本的本地执行。
让我们看与上一节相同的示例,但是使用persist()方法:
// In Scala
import org.apache.spark.storage.StorageLevel
// Create a DataFrame with 10M records
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.persist(StorageLevel.DISK_ONLY) // Serialize the data and cache it on disk
df.count() // Materialize the cache
res2: Long = 10000000
Command took 2.08 seconds
df.count() // Now get it from the cache
res3: Long = 10000000
Command took 0.38 seconds
从图7-5中可以看到,数据保留在磁盘上,而不是内存中。要取消持久化缓存的数据,只需调用DataFrame.unpersist()。

最后,不仅可以缓存DataFrame,还可以缓存从DataFrame派生的表或视图。使它们在Spark UI中更具可读性。例如:
// In Scala
df.createOrReplaceTempView("dfTable")
spark.sql("CACHE TABLE dfTable")
spark.sql("SELECT count(*) FROM dfTable").show()
+--------+
|count(1)|
+--------+
|10000000|
+--------+
Command took 0.56 seconds
何时缓存和持久化
缓存的常见应用场景是重复访问大数据集以进行查询或转换。一些示例包括:
DataFrames常用于迭代机器学习训练中
DataFrames频繁被用于ETL期间进行频繁转换或建立数据管道
什么时候不缓存和持久化
并非所有用例都规定了需要缓存,有一些场景是不需要访问DataFrame的,比如下面的例子:
l DataFrame太大而内存无法满足需求
l 在DataFrame上进行廉价不频繁的转换,而无需考虑它的大小
通常,应谨慎使用内存缓存,因为它可能会导致序列化和反序列化从而导致资源消耗,这取决于所使用的StorageLevel。
接下来,我们将重点转移到讨论几个常见的Spark连接操作上,这些操作会触发高代价的数据移动,要求集群提供计算和网络资源,以及如何通过组织数据来减轻这种移动。
Spark 连接策略
连接操作是大数据分析中一种常见的转换类型,其中两个以表或DataFrames形式的数据集通过一个公共的配对键合并。与关系型数据库的表关联类似,Spark DataFrame和Dataset API以及Spark SQL提供了一系列连接转换:内部连接,外部连接,左连接,右连接等。所有的这些操作都会触发Spark Executor之间的数据移动。
这些转换的核心是Spark如何计算和要生成什么数据,以及将相关联的数据写入到磁盘中,以及如何将这些key和数据传输到节点上进行一系列操作,如groupBy(),join(),agg(),sortBy()和reduceByKey()。以上这些操作我们通常称为shuffle操作,也就是常说的“洗牌”。
Spark具有五种不同的连接策略,通过它可以在Executor之间交换,移动,排序,分组和合并数据:
Shuffle Hash Join(SHJ):shuffle 哈希连接
Broadcast Hash Join(BHJ):广播哈希连接
Sort Merge Join(SMJ):排序合并连接
Cartesian Join(CJ):笛卡尔积连接
Broadcast Nested Loop Join(BNLJ):广播嵌套连接
在这里,我们仅关注其中的两种策略(BHJ和SMJ),也是我们在开发中最常见的两种连接策略。
广播哈希连接(BHJ)
map-side-only join也称为“仅在map端的连接”,当需要将两个数据集和另一个足够大数据集结合使用时,其中一个数据集较小,适合加载到Driver和Executor内存中,为了避免大规模数据移动,采用广播哈希连接。使用Spark广播变量,较小的数据集由驱动程序广播到所有Spark Executor,如图7-6所示,随后将其与每个Executor上的较大数据集合并。这种策略避免了大量的数据交换。

默认情况下,如果较小的数据集小于10 MB,Spark将使用广播连接。该配置spark.sql.autoBroadcastJoinThreshold进行设置; 你可以根据每个executor和驱动程序中的内存大小来进行动态调整。如果你确信有足够的内存,则可以对大于10 MB(甚至最大100 MB)的DataFrame使用广播连接。
一个常见的用例是,当你在两个DataFrame之间拥有一组通用的key时,其中一个DataFrame包含的信息少于另一个DataFrame,并且这时候你需要将两者合并成视图。例如,考虑一个简单的情况,你拥有世界各地大量的足球运动员的数据集playersDF以及球员所在足球俱乐部的数据集clubsDF,其中clubsDF数据集较小,并且你希望通过一个公共的key将两者连接起来:
// In Scala
import org.apache.spark.sql.functions.broadcast
val joinedDF = playersDF.join(broadcast(clubsDF), "key1 === key2")
在此代码中,我们强制Spark进行广播连接,但是默认情况下,但如果较小的数据集的大小小于spark.sql.autoBroadcastJoinThreshold,那么会默认使用这种连接策略。
BHJ是Spark提供的最简单,最快的连接策略,因为它不涉及任何数据集。经过spark广播之后,所有数据都可以在本地供 Executor使用。你只需要确保Spark驱动程序和Executor都具有足够的内存,就可以将较小的数据集保存在内存中。
在执行该操作之后的任何时间,你都可以通过执行以下命令查看物理计划中执行了哪些连接操作:
joinDF.explain(mode)
在Spark 3.0中,你可以使用joinedDF.explain('mode') 显示一个可读的和易于理解的输出,该模式包括了'simple', 'extended', 'codegen', 'cost'和'formatted'这几种类型。
何时使用广播哈希连接
在以下条件下使用这种类型的连接以获得最大利益:
当较小和较大数据集中的每个键被Spark散列到同一分区时
当一个数据集比另一个数据集小得多时(并且在默认配置10 MB内;如果有足够的内存,则更多;如果不超过10 MB,则默认配置为10 MB)
当你只想执行等值连接时,根据匹配的未排序key关联两个数据集
当你不必担心使用过多的网络带宽资源或者OOM错误时,因为较小的数据集将广播给所有Spark Executor
在Spark中指定spark.sql.autoBroadcastJoinThreshold 的值为-1,则会导致Spark一直采用shuffle排序合并连接策略(SMJ),我们将在下一节中讨论。
Shuffle排序合并连接(SMJ)
排序合并算法是基于某个相同的key合并两个大的数据集的有效方法,该key是可排序,唯一的、且可以分配给或存储在同一个分区上,也就是说,两个数据集的公共哈希key最终会落在同一分区上。从Spark的角度来看,这意味着每个数据集中具有相同key的所有行都将散列在同一Executor的同一分区上。显然,这意味着数据必须在Executor之间进行协调或交换。
顾名思义,此连接方案有两个阶段:排序阶段和合并阶段。排序阶段根据连接的key对每个数据集进行排序;合并阶段则是从每个数据集中迭代行中的每个key,如果两个key匹配,则合并这些行。
默认情况下,通过spark.sql.join.preferSortMergeJoin启用SortMergeJoin。以下是本书的GitHub repo中可用于独立应用程序notebook中的代码段。主要思想是提取两个具有一百万条记录的大型DataFrame,并将它们通过公共的key进行连接,即uid == users_id。
虽然该数据是合成的,但也能说明了这一点:
// In Scalaimport scala.util.Random
// Show preference over other joins for large data sets
// Disable broadcast join// Generate data...spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
// Generate some sample data for two data sets
var states = scala.collection.mutable.Map
var items = scala.collection.mutable.Map
val rnd = new scala.util.Random(42)
// Initialize states and items purchasedstates += (0 -> "AZ", 1 -> "CO", 2-> "CA", 3-> "TX", 4 -> "NY", 5-> "MI")items += (0 -> "SKU-0", 1 -> "SKU-1", 2-> "SKU-2", 3-> "SKU-3", 4 -> "SKU-4", 5-> "SKU-5")
// Create DataFrames
val usersDF = (0 to 1000000).map(id => (id, s"user_{id}'
s"user_${id}@databricks.com", states(rnd.nextInt(5))))
.toDF("uid", "login", "email", "user_state")
val ordersDF = (0 to 1000000)
.map(r => (r, r, rnd.nextInt(10000), 10 * r* 0.2d,
states(rnd.nextInt(5)), items(rnd.nextInt(5)))).
toDF("transaction_id", "quantity", "users_id", "amount", "state", "items")
// Do the join
val usersOrdersDF = ordersDF.join(usersDF, "uid")
// Show the joined resultsusersOrdersDF.show(false)
+--------------+--------+--------+--------+-----+-----+---+---+-------|transaction_id|quantity|users_id|amount|state|items|uid|...|user_state|
+--------------+--------+--------+--------+-----+-----+---+---+-------
|3916 |3916 |148 |7832.0 |CA |SKU-1|148|...|CO |
|36384 |36384 |148 |72768.0 |NY |SKU-2|148|...|CO |
|41839 |41839 |148 |83678.0 |CA |SKU-3|148|...|CO |
|48212 |48212 |148 |96424.0 |CA |SKU-4|148|...|CO |
|48484 |48484 |148 |96968.0 |TX |SKU-3|148|...|CO |
|50514 |50514 |148 |101028.0|CO |SKU-0|148|...|CO |
|65694 |65694 |148 |131388.0|TX |SKU-4|148|...|CO |
|65723 |65723 |148 |131446.0|CA |SKU-1|148|...|CO |
93125 |93125 |148 |186250.0|NY |SKU-3|148|...|CO |
|107097 |107097 |148 |214194.0|TX |SKU-2|148|...|CO |
|111297 |111297 |148 |222594.0|AZ |SKU-3|148|...|CO |
|117195 |117195 |148 |234390.0|TX |SKU-4|148|...|CO |
|253407 |253407 |148 |506814.0|NY |SKU-4|148|...|CO |
|267180 |267180 |148 |534360.0|AZ |SKU-0|148|...|CO |
|283187 |283187 |148 |566374.0|AZ |SKU-3|148|...|CO |
|289245 |289245 |148 |578490.0|AZ |SKU-0|148|...|CO |
|314077 |314077 |148 |628154.0|CO |SKU-3|148|...|CO |
|322170 |322170 |148 |644340.0|TX |SKU-3|148|...|CO |
|344627 |344627 |148 |689254.0|NY |SKU-3|148|...|CO |
|345611 |345611 |148 |691222.0|TX |SKU-3|148|...|CO |
+--------------+--------+--------+--------+-----+-----+---+---+-----
only showing top 20 rows
检查我们的最终执行计划,我们注意到Spark使用了SortMerge Join来连接两个DataFrame。该Exchange操作是对每个Executor上的map操作的结果的重新排列:
usersOrdersDF.explain()
== Physical Plan ==
InMemoryTableScan [transaction_id#40, quantity#41, users_id#42, amount#43,
state#44, items#45, uid#13, login#14, email#15, user_state#16]
+- InMemoryRelation [transaction_id#40, quantity#41, users_id#42, amount#43,
state#44, items#45, uid#13, login#14, email#15, user_state#16],
StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(3) SortMergeJoin [users_id#42], [uid#13], Inner
:- *(1) Sort [users_id#42 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(users_id#42, 16), true, [id=#56]
: +- LocalTableScan [transaction_id#40, quantity#41, users_id#42,
amount#43, state#44, items#45]
+- *(2) Sort [uid#13 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(uid#13, 16), true, [id=#57]
+- LocalTableScan [uid#13, login#14, email#15, user_state#16]
此外,Spark UI(我们将在下一节中讨论)显示了整个作业的三个阶段:Exchange和Sort操作在最后阶段进行,然后合并结果,如图7-7和7-8所示。。这样做交换的成本很昂贵,并且需要在Executor之间通过网络对分区进行shuffle。


优化shuffle排序合并连接
如果我们为常见的排序键或列创建分区桶,则可以从该方案中省去Exchange步骤。也就是说,我们可以创建大量的存储桶来存储特定的排序列(每个存储桶一个键)。通过这种方式对数据进行预分类和重组可以提高性能,因为它使我们可以跳过昂贵的数据交换操作并直接进行操作WholeStageCodegen。
在本章notebook的以下代码片段中(在本书的GitHub repo中可以找到),我们将按连接的users_id和uid列进行排序和分桶,并将桶以Parquet格式保存为Spark管理表:
// In Scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
// Save as managed tables by bucketing them in Parquet format
usersDF.orderBy(asc("uid"))
.write.format("parquet")
.bucketBy(8, "uid")
.mode(SaveMode.OverWrite)
.saveAsTable("UsersTbl")
ordersDF.orderBy(asc("users_id"))
.write.format("parquet")
.bucketBy(8, "users_id")
.mode(SaveMode.OverWrite)
.saveAsTable("OrdersTbl")
// Cache the tables
spark.sql("CACHE TABLE UsersTbl")
spark.sql("CACHE TABLE OrdersTbl")
// Read them back in
val usersBucketDF = spark.table("UsersTbl")
val ordersBucketDF = spark.table("OrdersTbl")
// Do the join and show the results
val joinUsersOrdersBucketDF = ordersBucketDF
.join(usersBucketDF, $"users_id" === $"uid")
joinUsersOrdersBucketDF.show(false)
+--------------+--------+--------+---------+-----+-----+---+---+------
|transaction_id|quantity|users_id|amount|state|items|uid|...|user_state|
+--------------+--------+--------+---------+-----+-----+---+---+------
|144179 |144179 |22 |288358.0 |TX |SKU-4|22 |...|CO |
|145352 |145352 |22 |290704.0 |NY |SKU-0|22 |...|CO |
|168648 |168648 |22 |337296.0 |TX |SKU-2|22 |...|CO |
|173682 |173682 |22 |347364.0 |NY |SKU-2|22 |...|CO |
|397577 |397577 |22 |795154.0 |CA |SKU-3|22 |...|CO |
|403974 |403974 |22 |807948.0 |CO |SKU-2|22 |...|CO |
|405438 |405438 |22 |810876.0 |NY |SKU-1|22 |...|CO |
|417886 |417886 |22 |835772.0 |CA |SKU-3|22 |...|CO |
|420809 |420809 |22 |841618.0 |NY |SKU-4|22 |...|CO |
|659905 |659905 |22 |1319810.0|AZ |SKU-1|22 |...|CO |
|899422 |899422 |22 |1798844.0|TX |SKU-4|22 |...|CO |
|906616 |906616 |22 |1813232.0|CO |SKU-2|22 |...|CO |
|916292 |916292 |22 |1832584.0|TX |SKU-0|22 |...|CO |
|916827 |916827 |22 |1833654.0|TX |SKU-1|22 |...|CO |
|919106 |919106 |22 |1838212.0|TX |SKU-1|22 |...|CO |
|921921 |921921 |22 |1843842.0|AZ |SKU-4|22 |...|CO |
|926777 |926777 |22 |1853554.0|CO |SKU-2|22 |...|CO |
|124630 |124630 |22 |249260.0 |CO |SKU-0|22 |...|CO |
|129823 |129823 |22 |259646.0 |NY |SKU-4|22 |...|CO |
|132756 |132756 |22 |265512.0 |AZ |SKU-2|22 |...|CO |
+--------------+--------+--------+---------+-----+-----+---+---+-----
only showing top 20 rows
连接的输出按uid和users_id做排序,因为我们保存的表是升序排列的。因此,在SortMergeJoin期间无需进行排序。查看Spark UI(图7-9),我们可以看到我们跳过了Exchange并直接转到WholeStageCodegen。
物理计划还显示,与插入前的物理计划相比,没有执行Exchange:Exchange与存储之前的物理计划相比,该物理计划还显示未执行任何操作:
joinUsersOrdersBucketDF.explain()
== Physical Plan ==
*(3) SortMergeJoin [users_id#165], [uid#62], Inner
:- *(1) Sort [users_id#165 ASC NULLS FIRST], false, 0
: +- *(1) Filter isnotnull(users_id#165)
: +- Scan In-memory table `OrdersTbl` [transaction_id#163, quantity#164,
users_id#165, amount#166, state#167, items#168], [isnotnull(users_id#165)]
: +- InMemoryRelation [transaction_id#163, quantity#164, users_id#165,
amount#166, state#167, items#168], StorageLevel(disk, memory, deserialized, 1
replicas)
: +- *(1) ColumnarToRow
: +- FileScan parquet
...


何时使用shuffle排序合并连接
在以下条件下使用这种类型的连接以获得最大利益:
当两个大型数据集中的每个键可以排序并通过Spark散列到同一分区时。
当你只想执行等值连接,基于匹配的排序键组合两个数据集时。
当你要防止Exchange和Sort操作,以夸网络节省大量的shuffle操作时。
到目前为止,我们已经介绍了与调整和优化Spark有关的操作方面,以及Spark如何在两次常见的连接操作期间交换数据。我们还演示了如何通过使用桶来避免大量的数据交换从而提高shuffle排序合并连接操作的性能。
正如你在前面的图中所看到的,Spark UI是可以对这些操作进行可视化分析的有效渠道。它显示了收集到的指标和程序状态,揭示了有关可能的性能瓶颈的大量信息以及线索。在本章的最后部分,我们讨论在Spark UI中可以查看哪些内容。
查看Spark UI
Spark提供了精心设计的Web UI,使得我们能够检查应用程序的各个组件。它提供了有关内存使用情况、作业、阶段和任务的详细信息,以及事件时间表,日志以及各种指标和统计信息,可让你深入了解Spark应用程序中在Spark驱动程序级别和单个Executor中发生的情况。
spark-submit 作业同时会启动Spark UI,你可以在本地主机上(在本地模式下)或通过默认端口4040上的Spark驱动程序(在其他模式下)进行访问。
学习Spark UI选项卡
Spark UI有六个选项卡,如图7-10所示,每个选项卡都给我们提供了探索的机会。让我们看一下每个选项卡向我们展示的内容。

本讨论适用于Spark 2.x和Spark 3.0。虽然Spark 3.0中的大部分UI相同,但它还添加了第七个选项卡,即“Structured Streaming”。我买将在第12章中进行预览。
Jobs和Stages
正如你在第2章中了解到的那样,Spark将应用程序细分为作业、阶段和任务。通过“Jobs和Stages”选项卡,你可以浏览这些内容并向下钻取一个细粒度的级别,以检查各个任务的详细信息。你可以查看它们的完成状态并查看与I/O、内存消耗以及执行时间等相关的指标。
图7-11显示了展开的事件时间线的“Jobs”选项卡,显示了Executor何时被添加到集群或从集群中删除了 。它还提供了集群中所有已完成作业的表格列表。“Duration”列表示完成每个作业所花费的时间(由第一列中的JobID标识)。如果该时间耗时很长,则表明你需要分析该作业的各个阶段,以查看哪些任务可能会导致延迟。通过这个摘要页面,你还可以访问每个作业的详细信息页面,包括DAG可视化和已完成阶段的列表。

Stages”选项卡提供了应用程序中所有作业的所有阶段的当前状态的摘要。你还可以访问每个阶段的详细信息页面,提供有关其任务的DAG和指标(图7-12)。除了其他一些可选的统计信息之外,你还可以看到每个任务的平均持续时间,在垃圾回收(GC)上花费的时间以及读取的shuffle字节/记录数。如果从远程executor读取shuffle数据,则较高的shuffle读取阻塞时间会发出I/O问题的信号。较高的GC时间表示堆上的对象太多(你的Executor可能内存不足)。如果一个阶段的最大任务时间远远大于中位数,则可能是由于分区中数据分布不均而导致数据倾斜。让我们找出一些有说服力的现象来说明问题。

你还可以在此页面上看到每个执行者的聚合指标以及每个任务的明细。
Executors
“Executors”选项卡提供为应用程序创建的Executor的有关信息。正如你在图7-13看到的,你可以深入了解有关资源使用情况(磁盘,内存,内核)、在GC上花费的时间以及shuffle过程中写入和读取的数据量等详细信息。

除了汇总统计数据,你还可以查看每个executor如何使用内存以及用于什么目的。这还有助于当你在DataFrame或托管表上使用cache()或persist()方法时查看资源使用情况,我们将在下面讨论这些问题。
Storage
在“shuffle排序合并连接”中的Spark代码中,在关联后缓存了两个托管表。如图7-14所示,“Storage”选项卡提供了有关应用程序使用cache()或persist()方法而缓存的任何表或DataFrame的信息。

单击图7-14中的“ In-memory table`UsersTbl`”链接,可以进一步了解该表是如何在1个Executor和8个分区上的内存和磁盘上缓存的,这个数字对应于我们为该表创建的桶的数量(请参见图7-15)。

SQL
通过SQL选项卡可以跟踪和查看作为Spark应用程序的一部分而执行的Spark SQL查询的效果。你可以查看执行查询的时间,执行了哪些作业及其持续时间。例如,在SortMergeJoin示例中,我们执行了一些查询;所有这些查询都显示在图7-16中,其链接可以进一步钻取。

单击查询描述将显示所有物理操作的执行计划的详细信息,如图7-17所示。根据该计划,在这里,每个物理运算符Scan In-memory table、HashAggregate和Exchange都是 SQL指标。
当我们要检查物理操作符的详细信息并探索发生了什么事情:扫描了多少行,写入了多少shuffle字节等等,这些度量标准非常有用。

Environment
如图7-18所示,“Environment”选项卡与其他选项卡一样重要。了解你的Spark应用程序运行的环境,会发现许多对故障排除有用的线索。实际上,必须知道设置了哪些环境变量,包括了哪些jar,设置了哪些Spark属性(以及它们各自的值,特别是如果你对“优化和调整Spark效率”中提到的某些配置进行了调整),设置什么系统属性,使用哪种运行时环境(例如JVM或Java版本)等。所有这些只读详细信息都是非常重要的信息,如果你发现Spark应用程序中有任何异常行为,可以以此作为依据进行排查和调整。

SPARK应用程序调试
在本节中,我们浏览了Spark UI中的各个选项卡。如你所见,UI提供了大量信息,可用于调试和解决Spark应用程序中的问题。除了我们在这里介绍的内容之外,它还提供对驱动程序和Executor stdout / stderr日志的访问,在其中你可能已记录了部分调试信息。
通过UI进行调试与在你最喜欢的IDE中逐步执行应用程序不同,过程更像侦查,跟踪线索,尽管你更喜欢这种方法,也可以在本地诸如IntelliJ IDEA之类的IDE中调试Spark应用程序。
“ Spark 3.0 UI”选项卡显示了发生情况的有价值的线索,以及访问驱动程序和Executor stdout / stderr的日志,你可能已在其中记录了某些调试信息。
最初,大量的信息可能会使新手不知所措。但是随着时间的流逝,你将了解在每个选项卡中查找的内容,并且可以更快地检测和诊断异常。这样的调试模式将变得清晰明了,在运行一些Spark示例后,通过经常访问这些选项卡并熟悉它们,你将习惯于通过UI调整和检查Spark应用程序。
总结
在本章中,我们讨论了许多用于优化Spark应用程序的优化技术。如你所见,通过调整一些默认的Spark配置,可以改善大型工作负载的伸缩性,增强并行性,并最大程度地减少Spark Executor之间的内存不足。你还可以了解如何使用适当级别的缓存和持久化策略来加快对常用数据集的访问,并且我们研究了Spark使用的两个常用连接进行复杂聚合,并演示了DataFrames如何按key排序进行分桶,借此跳过shuffle操作。
最后,为了更直观地了解性能,Spark UI提供了可视化界面。尽管UI内容丰富且详细,但它并不等效于IDE中的逐步调试。但是我们展示了如何通过Spark UI的六个选项卡检查和收集指标和统计数据,包括计算和内存使用数据以及SQL查询执行跟踪等信息。
在下一章中,我们将深入探讨结构化流,并向你展示在前几章中了解到的结构化API如何使你连续地编写流应用程序和批处理应用程序,从而使你能够构建可靠的数据湖和管道。