使用MLlib进行机器学习(十-下)
写在前面:
大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与AI相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。
业余时间专注于输出大数据、AI等相关文章,目前已经输出了40万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。
想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。
内推信息
如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。
免费学习资料
如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!
学习交流群
如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。
超参数调整
当数据科学家谈论调整模型时,他们经常讨论调整超参数以提高模型的预测能力。一个
在本节中,我们将重点介绍使用基于树的模型作为超参数调整过程的示例,但是相同的概念也适用于其他模型。设置好要使用spark.ml进行超参数调整的机制后,我们将讨论优化管道的方法。让我们从简要介绍决策树开始,然后介绍如何在spark.ml中使用决策树。
基于树的模型
基于树的模型(例如决策树,梯度提升树和随机森林)是相对简单但功能强大的模型,易于解释(意味着,很容易解释它们做出的预测)。因此,它们在机器学习任务中非常受欢迎。我们将很快进入随机森林,但首先我们需要介绍决策树的基础知识。
决策树
作为一种现成的解决方案,决策树非常适合数据挖掘。它们的构建速度相对较快,可高度解释,并且具有尺度不变性(即,标准化或缩放数字特征不会改变树的性能)。那么什么是决策树?
决策树是从你的数据中学到的一系列if-then-else规则,用于分类或回归任务。假设我们正在尝试建立一个模型来预测某人是否会接受工作机会,并且这些特征包括薪水,通勤时间,免费咖啡等。如果我们将决策树拟合到该数据集,我们可能会得到一个模型看起来像图10-9。

树顶部的节点称为树的“根”,因为它是我们“拆分”的第一个特征。此功能应提供最有益的信息分配-在这种情况下,如果薪水低于50,000美元,则大多数候选人将拒绝这份工作。“拒绝报价”节点称为“叶节点”,因为该节点没有其他拆分。它在分支的末尾。(是的,我们称它为决策“树”,但在树的根部画在顶部,在树的叶子画在底部,这有点有趣!)
但是,如果提供的薪水大于$ 50,000,我们将继续执行决策树中下一个最有用的功能,在这种情况下,这是通勤时间。即使工资超过$ 50,000,如果通勤时间超过一小时,那么大多数人都会拒绝这份工作。
我们不会在这里详细介绍如何确定哪些功能可以为你带来最大的信息收益,但是如果你有兴趣,请参阅Trevor Hastie,Robert Tibshirani和Jerome撰写的《The Elements of Statistical Learning》第9章。
我们模型的最后一个特征是免费咖啡。在这种情况下,决策树显示,如果薪水高于$ 50,000,上下班不到一个小时,并且有免费的咖啡,那么大多数人都会接受我们的工作机会(如果就这么简单!)。作为后续资源,R2D3对决策树的工作方式具有很好的可视化效果。
可以在单个决策树中对同一要素进行多次拆分,但是每次拆分将以不同的值发生。
决策树的
在解释了决策树的本质之后,让我们继续进行决策树特征准备的主题。对于决策树,你不必担心标准化或缩放输入特征,因为这对拆分没有影响,但是你必须小心如何准备类别特征。
基于树的方法可以自然地处理类别变量。在spark.ml,你只需要将类别列传递到StringIndexer,决策树就可以处理其余的事情。让我们用决策树拟合我们的数据集:
In Python
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(labelCol="price")
Filter for just numeric columns (and exclude price, our label)
numericCols = [field for (field, dataType) in trainDF.dtypes
if ((dataType == "double") & (field != "price"))]
Combine output of StringIndexer defined above and numeric columns
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
Combine stages into pipeline
stages = [stringIndexer, vecAssembler, dt]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(trainDF) # This line should error
// In Scala
import org.apache.spark.ml.regression.DecisionTreeRegressor
val dt = new DecisionTreeRegressor()
.setLabelCol("price")
// Filter for just numeric columns (and exclude price, our label)
val numericCols = trainDF.dtypes.filter{ case (field, dataType) =>
dataType == "DoubleType" && field != "price"}.map(_._1)
// Combine output of StringIndexer defined above and numeric columns
val assemblerInputs = indexOutputCols ++ numericCols
val vecAssembler = new VectorAssembler()
.setInputCols(assemblerInputs)
.setOutputCol("features")
// Combine stages into pipeline
val stages = Array(stringIndexer, vecAssembler, dt)
val pipeline = new Pipeline()
.setStages(stages)
val pipelineModel = pipeline.fit(trainDF) // This line should error
这将产生以下错误:
java.lang.IllegalArgumentException: requirement failed: DecisionTree requires
maxBins (= 32) to be at least as large as the number of values in each
categorical feature, but categorical feature 3 has 36 values. Consider removing
this and other categorical features with a large number of values, or add more
training examples.
我们可以看到参数maxBins存在问题。该参数有什么作用?maxBins是用来确定将连续特征离散化或分割成的参数。该离散化步骤对于执行分布式训练至关重要。scikit-learn之所以没有maxBins参数,是因为所有数据和模型都驻留在一台机器上。但是,在Spark中,工作节点具有数据的所有列,但只有行的一部分。因此,在讨论要分割的特征和值时,我们需要确保它们都在谈论相同的分割值,这是我们在训练时建立的常见离散化得到的。让我们看一下图10-10,它显示了PLANET分布式决策树的实现,以更好地理解分布式机器学习并说明maxBins参数。

每个worker都必须计算每个特征和每个可能的分割点的摘要统计信息,并且这些统计信息将汇总到各个worker中。MLlib要求maxBins足够大以处理类别列的离散化。maxBins的默认值是32,并且我们有一个包含36个不同值的类别列,这就是为什么我们更早得到错误的原因。尽管我们可以增加maxBins到64以更准确地表示连续特征,但这会使连续变量可能的分割数量翻倍,从而大大增加了计算时间。让我们改为将maxBins设置为40并重新训练管道。在这里你会注意到我们正在使用setter方法setMaxBins() 修改决策树而不是完全重新定义它:
In Python
dt.setMaxBins(40)
pipelineModel = pipeline.fit(trainDF)
// In Scala
dt.setMaxBins(40)
val pipelineModel = pipeline.fit(trainDF)
由于实现上的差异,使用scikit-learn与MLlib建立模型时,通常不会得到完全相同的结果。但是,没关系。关键是要了解它们为何不同,并查看包含哪些参数以使它们按照你所期望的方式执行。如果要将工作负载从scikit-learn移植到MLlib,建议你查看spark.ml和scikit-learn文档,以了解哪些参数有所不同,并对这些参数进行调整以获得相同数据的可比结果。值足够接近后,你可以将MLlib模型扩展到scikit-learn无法处理的较大数据大小。
现在我们已经成功构建了模型,我们可以提取决策树学习的if-then-else规则:
In Python
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)
// In Scala
val dtModel = pipelineModel.stages.last
.asInstanceOf[org.apache.spark.ml.regression.DecisionTreeRegressionModel]
println(dtModel.toDebugString)
DecisionTreeRegressionModel: uid=dtr_005040f1efac, depth=5, numNodes=47,...
If (feature 12 <= 2.5)
If (feature 12 <= 1.5)
If (feature 5 in {1.0,2.0})
If (feature 4 in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
If (feature 3 in
{0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,...})
Predict: 104.23992784125075
Else (feature 3 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,...})
Predict: 250.7111111111111
...
这只是打印输出的一个子集,但你会注意到,可以对同一特征进行多次拆分(例如,特征12),但拆分值不同。还要注意决策树在数字特征和类别特征上的分割方式之间的区别:对于数字特征,它检查值是否小于或等于阈值;对于类别特征,它检查值是否在某个集合中。
我们还可以从模型中提取特征重要性得分,以查看最重要的特征:
In Python
import pandas as pd
featureImp = pd.DataFrame(
list(zip(vecAssembler.getInputCols(), dtModel.featureImportances)),
columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)
// In Scala
val featureImp = vecAssembler
.getInputCols.zip(dtModel.featureImportances.toArray)
val columns = Array("feature", "Importance")
val featureImpDF = spark.createDataFrame(featureImp).toDF(columns: _*)
featureImpDF.orderBy($"Importance".desc).show()


尽管决策树非常灵活且易于使用,但它们并非始终是最准确的模型。如果我们要在测试数据集上计算
让我们看一下通过使用一种集成方法来改进此模型,该方法结合了不同的模型以获得更好的结果,这就是随机森林。
随机森林
通过采取民主的方式来工作。想象一下,一个罐子里有很多M&Ms(美国巧克力豆品牌)。你让一百个人猜测并购的数量,然后取所有猜测的平均值。该平均值可能比大多数个人的猜测更接近真实值。相同的概念适用于机器学习模型。如果你构建许多模型并组合/平均它们的预测,则它们将比任何单个模型生成的预测更可靠。
按行Bootstrapping样本
按列随机选择特征
bagging的主要缺点是树都是高度相关的,因此可以在数据中学习相似的模式。为了减轻这个问题,每次要进行拆分时,只考虑列的随机子集(RandomForestRegressor只需要1/3特征以及提供√#features给RandomForestClassifier)。由于你引入了这种随机性,因此你通常希望每棵树都比较浅。你可能在想:这些树中的每一个都会比任何单个决策树表现更差,那么这种方法可能会更好吗?事实证明,每棵树都从你的数据集中学到了一些不同的东西,并将这些“弱”学习者集合成一个整体,使森林比单个决策树更健壮。
图10-11说明了训练时的随机森林。在每次拆分时,都会考虑要拆分的10个原始特征中的3个。最终,它从其中选出了最好的。

随机森林和决策树的API相似,并且都可以应用于回归或分类任务:
In Python
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)
// In Scala
import org.apache.spark.ml.regression.RandomForestRegressor
val rf = new RandomForestRegressor()
.setLabelCol("price")
.setMaxBins(40)
.setSeed(42)
训练完随机森林后,你就可以通过经过训练过的集成的不同树节点来传递新的数据点。
正如图10-12所示,如果你建立一个分类随机森林,它通过森林中的每颗树的测试点,并基于单个树的预测进行多数投票。(相比之下,在回归中,随机森林只是对这些预测求平均值)尽管这些树的性能都不如任何单独的决策树,但集合(或集成)实际上提供了更健壮的模型。

随机森林真正展示了使用Spark进行分布式机器学习的能力,因为每棵树都可以独立于其他树而构建(例如,在构建树10之前不需要构建树3)。此外,在树的每层,你可以并行化工作以找到最佳拆分。
那么,我们如何确定随机森林中的最佳树个数或这些树的最大深度呢?此过程称为
k折交叉验证
我们应该使用哪个数据集来确定最佳超参数值?如果我们使用训练集,则该模型可能会过拟合或记忆我们的训练数据的细微差别。这意味着将其推广到看不见的数据的可能性较小。但是,如果我们使用测试集,那么它将不再代表“看不见的”数据,因此我们将无法使用它来验证模型的泛化能力。因此,我们需要另一个数据集来帮助我们确定最佳超参数:
例如,与其像以前那样将数据拆分为80/20 训练/测试拆分,我们可以进行60/20/20拆分以分别生成训练、验证和测试数据集。然后,我们可以在训练集上构建模型,在验证集上评估性能以选择最佳超参数配置,然后将模型应用于测试集,以查看其在新数据上的性能如何。但是,这种方法的缺点之一是,我们损失了25%的训练数据(80%-> 60%),这写数据本可以用来帮助改进模型。这促使了使用
使用这种方法,我们没有像以前那样将数据集分为单独的训练,验证和测试集,而是将其分为训练和测试集,但是我们将训练数据用于训练和验证。为此,我们将训练数据分为

如此图所示,如果我们将数据分成三折,则首先在数据的第一折和第二折(或分割)上训练模型,然后在第三折上进行评估。然后,我们在数据的第一和第三折上建立具有相同超参数的相同模型,并在第二折上评估其效果。最后,我们在第二和第三折上建立模型,并在第一折上进行评估。然后,我们将这三个(或
确定超参数的搜索空间可能很困难,并且经常对超参数进行随机搜索要优于结构化网格搜索。有专门的库,例如Hyperopt,可帮助你确定最佳的超参数配置,我们将在第11章中进行介绍。
要在Spark中执行超参数搜索,请执行以下步骤:
1. 定义你要评估的estimator。
2. 使用ParamGridBuilder指定要更改的超参数以及它们各自的有效值。
3. 定义一个evaluator,以指定用于比较各种模型的指标。
4. 使用CrossValidator进行交叉验证,评估各个模型。
让我们从定义管道预估器开始:
In Python
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])
// In Scala
val pipeline = new Pipeline()
.setStages(Array(stringIndexer, vecAssembler, rf))
对于我们ParamGridBuilder,我们将最大深度maxDepth更改为2、4或6,并且将numTrees(随机森林中的树数)更改为10或100。这将给我们一个总共6(3x2)个不同超参数配置的网格:
(maxDepth = 2,numTrees = 10)
(maxDepth = 2,numTrees = 100)
(maxDepth = 4,numTrees = 10)
(maxDepth = 4,numTrees = 100)
(maxDepth = 6,numTrees = 10)
(maxDepth = 6,numTrees = 100)
In Python
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(rf.maxDepth, [2, 4, 6])
.addGrid(rf.numTrees, [10, 100])
.build())
// In Scala
import org.apache.spark.ml.tuning.ParamGridBuilder
val paramGrid = new ParamGridBuilder()
.addGrid(rf.maxDepth, Array(2, 4, 6))
.addGrid(rf.numTrees, Array(10, 100))
.build()
现在我们已经建立了超参数网格,我们需要定义如何评估每个模型以确定哪个模型表现最好。对于此任务,我们将使用RegressionEvaluator,并将RMSE用作我们的关注指标:
In Python
evaluator = RegressionEvaluator(labelCol="price",
predictionCol="prediction",
metricName="rmse")
// In Scala
val evaluator = new RegressionEvaluator()
.setLabelCol("price")
.setPredictionCol("prediction")
.setMetricName("rmse")
我们将使用CrossValidator来执行我们的
In Python
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=pipeline,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3,
seed=42)
cvModel = cv.fit(trainDF)
// In Scala
import org.apache.spark.ml.tuning.CrossValidator
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
.setSeed(42)
val cvModel = cv.fit(trainDF)
输出结果告诉我们该操作花费了多长时间:
Command took 1.07 minutes
那么,我们只训练了多少个模型?如果你回答18(6个超参数配置x 3倍交叉验证),这个答案很接近。一旦确定最佳的超参数配置后,如何将这三个(或
要检查交叉验证器的结果,可以看一下avgMetrics:
In Python
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))
// In Scala
cvModel.getEstimatorParamMaps.zip(cvModel.avgMetrics)
这是输出:
res1: Array[(org.apache.spark.ml.param.ParamMap, Double)] =
Array(({
rfr_a132fb1ab6c8-maxDepth: 2,
rfr_a132fb1ab6c8-numTrees: 10
},303.99522869739343), ({
rfr_a132fb1ab6c8-maxDepth: 2,
rfr_a132fb1ab6c8-numTrees: 100
},299.56501993529474), ({
rfr_a132fb1ab6c8-maxDepth: 4,
rfr_a132fb1ab6c8-numTrees: 10
},310.63687030886894), ({
rfr_a132fb1ab6c8-maxDepth: 4,
rfr_a132fb1ab6c8-numTrees: 100
},294.7369599168999), ({
rfr_a132fb1ab6c8-maxDepth: 6,
rfr_a132fb1ab6c8-numTrees: 10
},312.6678169109293), ({
rfr_a132fb1ab6c8-maxDepth: 6,
rfr_a132fb1ab6c8-numTrees: 100
},292.101039874209))
我们可以看到,通过CrossValidator得到我们的最佳模型(RMSE最低的模型)有maxDepth=6和numTrees=100。但是,这需要很长时间去运行。在下一节中,我们将研究如何在保持相同模型性能的同时减少训练模型的时间。
优化管道
如果你的代码花了足够长的时间让你考虑改进它,那么你应该对其进行优化。在前面的代码中,即使交叉验证器中的每个模型在技术上都是独立的,spark.ml实际上还是按顺序而不是并行地训练模型集合。在Spark 2.3中,引入了一个参数parallelism来解决此问题。此参数确定要并行训练的模型的数量,这些模型本身可以并行拟合。从《Spark Tuning Guide》中:
应该谨慎选择parallelism的值,以在不超出群集资源的情况下最大程度地提高并行度,并且较大的值可能并不总是会导致性能提高。一般而言,对于大多数集群,最大为10的值应该足够。
让我们将此值设置为4,看看是否可以更快地进行训练:
In Python
cvModel = cv.setParallelism(4).fit(trainDF)
// In Scala
val cvModel = cv.setParallelism(4).fit(trainDF)
答案是肯定的:
Command took 31.45 seconds
我们将训练时间缩短了一半(从1.07分钟缩短至31.45秒),但我们仍可以进一步改进它!我们还可以使用另一种技巧来加快模型训练的速度:将交叉验证器放入管道中(例如,Pipeline(stages=[..., cv]),而不是将管道放入交叉验证器中(例如CrossValidator(estimator=pipeline, ...))。每次交叉验证器评估管道时,它都会运行即使模型中的某些步骤没有更改,比如StringIndexer,也可以通过每个模型的管道的每个步骤进行操作,通过重新评估管道中的每个步骤,即使没有改变,我们也要一遍又一遍地学习相同StringIndexer的映射。
如果我们将交叉验证器放入管道中,则StringIndexer每次尝试其他模型时,我们都不会重新评估StringIndexer(或任何其他估算器):
In Python
cv = CrossValidator(estimator=rf,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3,
parallelism=4,
seed=42)
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(trainDF)
// In Scala
val cv = new CrossValidator()
.setEstimator(rf)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
.setParallelism(4)
.setSeed(42)
val pipeline = new Pipeline()
.setStages(Array(stringIndexer, vecAssembler, cv))
val pipelineModel = pipeline.fit(trainDF)
结果就是我们的训练时间减少了五秒钟:
Command took 26.21 seconds
由于有了parallelism参数并重新排列了管道的顺序,最后一次运行才是最快的。如果将其应用于测试数据集,你将看到获得相同的结果。尽管这些增益大约是几秒钟,但相同的技术也适用于更大的数据集和模型,并相应地节省了更多时间。你可以通过访问本书的GitHub repo库中的笔记本(notebook)来尝试自己运行此代码。
概括
在本章中,我们介绍了如何使用Spark MLlib(特别是基于DataFrame的API包spark.ml)构建管道。我们讨论了转换器和预估器之间的差异,以及如何使用Pipeline API组合它们以及评估模型的一些不同指标。然后,我们探索了如何使用交叉验证执行超参数调整以提供最佳模型,以及在Spark中优化交叉验证和模型训练的技巧。
所有这些内容都为下一章奠定了基础,在下一章中,我们将讨论部署策略以及使用Spark管理和扩展机器学习管道的方法。