SparkSQL内核剖析
概述:
之前有尝试在SparkSQL内核添加自定义SQL操作不同的底层数据源,实现计算分析任务,这里就对SparkSQL的Catalyst模块进行简要的分析。在早期大数据时代的大规模处理数据的技术是Hadoop提供的MapReduce任务,但这种框架执行效率太慢,进行一些关系型处理(如join)需要编写大量代码。后来hive这种框架可以让用户输入sql语句,自动进行优化并执行,降低了写MR代码的成本,同样对于早期的Spark来说,用户需要通过写RDD来完成执行逻辑,这样就使代码可读性不高,并且具体的执行逻辑不是最优的,会影响Spark任务的运行效率。
1. SparkSQL的基本介绍:

2. SparkSQL逻辑计划概述:
select fieldA, fieldB, filedC from tableA where fieldA > 10;
SQL主要由Projection(filedA,fieldB,fieldC),DataSource(tableA)和Filter(fieldA>10)三个部分组成,分别对应SQL查询过程中的Result,DataSource和Operation:
实际的SQL执行顺序过程是按照Opertaion->DataSouece->Result的顺序,刚好与SQL的语法刚好相反,具体包括:
在SparkSQL中同样会先将SQL语句进行Parse形成一个Tree,然后使用Rule对Tree进行绑定,优化等处理过程(这里通过匹配模式对不同的节点采用不同的操作)。这个最核心的过程就是由Spark的Catalyst负责完成SQL的解析,绑定,优化以及生成物理计划。SparkSQL模块主要由core,catalyst,hive和hive-thriftserver组成:
core:负责处理数据的输入/输出,从数据源获取数据,输出DataFrame;
catalyst:SQL的解析,绑定,优化以及生成物理计划
hive:负责对hive数据的处理
hive-thriftserver:提供CLI和JDBC接口等。

论论文SparkSQL Catalyst的解析流程图:

/**
* 测试代码
*/
object TestSpark {
case class Person(name: String, age: Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("test").master("local").getOrCreate()
import spark.implicits._
val df: DataFrame = spark.sparkContext.parallelize(Array(
Person("zhangsan", 10),
Person("lisi", 20),
Person("wangwu", 30)))
.toDF("name", "age")
df.createTempView("people")
/**
* 1.SparkSqlParser中的AstBuilder将语法树的各个节点转换为对应LogicalPlan节点,组成未解析的逻辑算子树,不包含数据信息与列信息
*
* 2.Analyzer将一系列规则作用在未解析逻辑算子树上,生成解析后的逻辑算子树
*
* 3.Optimizer将一系列优化规则应用在逻辑算子树中,确保结果正确的前提下改进低效结构,生成优化后的逻辑算子树
*/
val sql = spark.sql("select name from people where age >= 20")
sql.queryExecution.debug
}
}
2.1 SQL Parse
SparkSQL的采用的是Anrl4进行SQL的词法和语法解析(Spark SQL和Presto采用的是Antlr4,FLink采用的是Calcite),Anrl4主要提供了Parser编译器和Translator解释器框架。
在Spark源码中提供了一个.g4文件,编译的时候会使用Antlr根据这个.g4生成对应的词法分析类和语法分析类,采用访问者模式,即会去遍历生成的语法树(针对语法树中每个节点生成一个visit方法),以及返回相应的值,用以构建Logical Plan语法树。

SparkSQL使用Antlr4的访问者模式,生成Unresolved Logical Plan。这里,可以用IDEA ANTLR Preview插件可以看到到SQL解析后生成的语法树,譬如:
SELECT A FROM TABLE
转换成一棵语法树的可视图,SparkBase.g4文件还有很多其他类型的语句,比如INSERT,ALERT等等。

其中,LogicalPlan其实是继承自TreeNode,所以本质上LogicalPlan就是一棵树。Tree提供UnaryNode,BinaryNode和LeafNode三种trait:
LeafNode,叶子节点,一般用来表示用户命令
UnaryNode,一元节点,表示FILTER等操作
BinaryNode,二元节点,表示JOIN,GROUP BY等操作
/**
* A logical plan node with no children.
*/
trait LeafNode extends LogicalPlan with LeafLike[LogicalPlan]
/**
* A logical plan node with single child.
*/
trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan]
/**
* A logical plan node with a left and right child.
*/
trait BinaryNode extends LogicalPlan with BinaryLike[LogicalPlan]
譬如上述SQL在parse阶段使用antlr4,将一条SQL语句解析成语法树,然后使用antlr4的访问者模式遍历生成语法树,也就是Logical Plan,但此时还是Unresolved LogicalPlan,即无法确定src是否存在,以及具体的的元数据是什么样。没有通过Analysis阶段,无法确定A的具体类型以及TABLE这个数据源是否存在等信息,只有通过Analysis阶段后,才会把Unresolved变成Resolved LogicalPlan。
spark.sql("select name from people where age >= 20")
logicalPlan: 'Project ['name]
+- 'Filter ('age >= 20)
+- 'UnresolvedRelation `people`
2.2 Analyzed
在Analysis阶段,使用Analysis Rules结合SeesionCatalog元数据,对会将Unresolved LogicalPlan进行解析,生成Resolved LogicalPlan的。Spark SQL通过使用Catalyst rule和Catalog来跟踪数据源的table信息。这个阶段核心处理类是Analyzer类,自身实现大量的rule,然后注册到batch变量中:
override def batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
// very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
// at the beginning of analysis.
OptimizeUpdateFields,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
SubstituteUnresolvedOrdinals),
Batch("Disable Hints", Once,
new ResolveHints.DisableHints),
Batch("Hints", fixedPoint,
ResolveHints.ResolveJoinStrategyHints,
ResolveHints.ResolveCoalesceHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions(v1SessionCatalog) ::
ResolveNamespace(catalogManager) ::
new ResolveCatalogs(catalogManager) ::
ResolveUserSpecifiedColumns ::
ResolveInsertInto ::
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
AddMetadataColumns ::
DeduplicateRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ResolveOutputRelation ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables ::
ResolveHigherOrderFunctions(catalogManager) ::
ResolveLambdaVariables ::
ResolveTimeZone ::
ResolveRandomSeed ::
ResolveBinaryArithmetic ::
ResolveUnion ::
typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
Batch("Apply Char Padding", Once,
ApplyCharTypePadding),
Batch("Post-Hoc Resolution", Once,
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF,
ResolveEncodersInUDF),
Batch("UpdateNullability", Once,
UpdateAttributeNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases),
Batch("HandleAnalysisOnlyCommand", Once,
HandleAnalysisOnlyCommand)
)
具体调用RuleExecutor的execute方法串行执行这些Rule,匹配UnresolvedRelation,然后递归去Catlog中获取对应的元数据信息,递归将它及子节点变成Resoulved。
spark.sql("select name from people where age >= 20")
analyzed: Project [name#6]
+- Filter (age#7L >= cast(20 as bigint))
+- SubqueryAlias `people`
+- Project [name#3 AS name#6, age#4L AS age#7L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, TestSpark$Person, true])).name, true, false) AS name#3, assertnotnull(assertnotnull(input[0, TestSpark$Person, true])).age AS age#4L]
+- ExternalRDD [obj#2]
可以看到经过Analyzed分析后,Parse阶段的Unresolved LogicalPlan进行展开,每一步的操作都与Session Catalog中的元数据进行了绑定,这个就是一条可执行的逻辑计划,即Resolved LogicalPlan。
2.3 Optimized
上面经过Analyzer阶段后,即生成了可执行的Logical Plan,但SparkSQL不会立即转化为Spark Plan执行,而是会对生成Resolved Logical Plan执行逻辑进行优化,提高运代码行效率,这个阶段叫Optimized。Optimizer优化器的实现和处理方式与上面Analyzer类似定义了一系列的Rule,然后利用这些Rule对Logical Plan和Expression进行迭代处理,其中主要的优化策略是合并,列裁剪和谓词下推等。
def defaultBatches: Seq[Batch] = {
val operatorOptimizationRuleSet =
Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin,
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
CollapseWindow,
CombineFilters,
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
ConstantPropagation,
FoldablePropagation,
OptimizeIn,
ConstantFolding,
ReorderAssociativeOperator,
LikeSimplification,
BooleanSimplification,
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
PruneFilters,
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveRedundantAliases,
RemoveRedundantProject,
SimplifyExtractValueOps,
CombineConcats) ++
extendedOperatorOptimizationRules
val operatorOptimizationBatch: Seq[Batch] = {
val rulesWithoutInferFiltersFromConstraints =
operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints)
Batch("Operator Optimization before Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) ::
Batch("Infer Filters", Once,
InferFiltersFromConstraints) ::
Batch("Operator Optimization after Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) :: Nil
}
(Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
RewriteDistinctAggregates,
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
// - Do the first call of CombineUnions before starting the major Optimizer rules,
// since it can reduce the number of iteration and the other rules could add/move
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
// run this once earlier. this might simplify the plan and reduce cost of optimizer.
// for example, a query such as Filter(LocalRelation) would go through all the heavy
// optimizer rules that are triggered when there is a filter
// (e.g. InferFiltersFromConstraints). if we run this batch earlier, the query becomes just
// LocalRelation and does not trigger many rules
Batch("LocalRelation early", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation) ::
Batch("Pullup Correlated Expressions", Once,
PullupCorrelatedPredicates) ::
Batch("Subquery", Once,
OptimizeSubqueries) ::
Batch("Replace Operators", fixedPoint,
RewriteExceptAll,
RewriteIntersectAll,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithFilter,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
Batch("Join Reorder", Once,
CostBasedJoinReorder) :+
Batch("Remove Redundant Sorts", Once,
RemoveRedundantSorts) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters) :+
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation) :+
Batch("Extract PythonUDF From JoinCondition", Once,
PullOutPythonUDFInJoinCondition) :+
// The following batch should be executed after batch "Join Reorder" "LocalRelation" and
// "Extract PythonUDF From JoinCondition".
Batch("Check Cartesian Products", Once,
CheckCartesianProducts) :+
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
ColumnPruning,
CollapseProject,
RemoveRedundantProject) :+
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences)
}
Optimizer的优化策略不仅对已绑定的Logical PLan进行优化,而且对Logical Plan中的Expression也进行优化,其原理就是遍历树,然后应用优化Rule。
spark.sql("select name from people where age >= 20")
optimizedPlan: Project [name#3]
+- Filter (age#4L >= 20)
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, TestSpark$Person, true]).name, true, false) AS name#3, assertnotnull(input[0, TestSpark$Person, true]).age AS age#4L]
+- ExternalRDD [obj#2]
可以看到,经过Optimizer阶段,生成的optimizedPlan对比Analyzer阶段生成的Resolved LogicalPlan简化了很多,Project部分只剩下name字段,其他没有可优化的地方保持不变。
2.4 Physical Plan
经过Optimzer的优化,SparkPlanner这个类会使用Strategies将优化后的Logical Plan进行转换,生成可执行的Physical Plan,相比较于Logical Plan,Physical Plan算是Spark能够执行的东西,这里spkarPlan就是相当于Physical Plan。
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
PythonEvals ::
DataSourceV2Strategy ::
FileSourceStrategy ::
DataSourceStrategy(conf) ::
SpecialLimits ::
Aggregation ::
Window ::
JoinSelection ::
InMemoryScans ::
BasicOperators :: Nil)
/**
* Override to add extra planning strategies to the planner. These strategies are tried after
* the strategies defined in [[ExperimentalMethods]], and before the regular strategies.
*/
def extraPlanningStrategies: Seq[Strategy] = Nil
这里传入一个逻辑计划,生成一个物理计划,即SparkPlan:
lazy val sparkPlan: SparkPlan = {
// We need to materialize the optimizedPlan here because sparkPlan is also tracked under
// the planning phase
assertOptimized()
executePhase(QueryPlanningTracker.PLANNING) {
// Clone the logical plan here, in case the planner rules change the states of the logical
// plan.
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
}
}
/**
* Transform a [[LogicalPlan]] into a [[SparkPlan]].
*
* Note that the returned physical plan still needs to be prepared for execution.
*/
def createSparkPlan(
sparkSession: SparkSession,
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan = {
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(plan)).next()
}
这里SparkSQL在真正执行时,会调用prepareForExecution将sparkPlan转换成executedPlan,并在sparkPlan中执行过程中,如果出现stage分区规则不同时插入Shuffle操作以及进行一些数据格式转换操作等等:
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = {
// We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure
// that the optimization time is not counted as part of the planning phase.
assertOptimized()
executePhase(QueryPlanningTracker.PLANNING) {
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
// optimizing and planning.
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}
最后,基于executedPlan.execute方法返回一个RDD,之后spark任务就会对这个RDD进行操作,返回结果集。
lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
executedPlan.execute(), sparkSession.sessionState.conf)
/**
* Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after
* preparations.
*
* Concrete implementations of SparkPlan should override `doExecute`.
*/
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}
这里打印出生成的sparkPlan:
spark.sql("select name from people where age >= 20")
sparkPlan: Project [name#3]
+- Filter (age#4L >= 20)
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, TestSpark$Person, true]).name, true, false) AS name#3, assertnotnull(input[0, TestSpark$Person, true]).age AS age#4L]
+- Scan[obj#2]
最后,打印出可执行的physicalPlan:
sql.explain()
== Physical Plan ==
*(1) Project [name#3]
+- *(1) Filter (age#4L >= 20)
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, TestSpark$Person, true]).name, true, false) AS name#3, assertnotnull(input[0, TestSpark$Person, true]).age AS age#4L]
+- Scan[obj#2]
3. 小结

本文把SparkSQL的解析流程大致的介绍了一遍,核心代码在QueryExecution这个类里面,流程大致可以划分为解析->绑定->优化->逻辑计划转物理计划->预准备->生成rdd,与直接写rdd的最本质的区别SparkSQL的Catalyst会进行逻辑计划的优化RBO(Rule-Based Optimizer),当然Spark在具体的执行的时候,还会执行代价优化--CBO(Cost-Based Optimizer,CBO);而另外一个实时计算框架Flink SQL的引擎采用的是Apache Calcite()实现支持 SQL 语句的解析和验证;HBase可以通过Apache Phoenix()实现SQL驱动,后续有时间也研究总结一下。
参考链接: