Bootstrap

Spark支持自定义Python环境

背景

现有大数据平台的Spark版本是2.1.0,Python版本2.7和3.6 ,通过PySpark+Jupyter方式提供服务。Python 2年岁久远,升级支持Python 3

遇到的问题

1.平台其他应用程序需要Python 2,升级不能影响已有的环境;

2.Spark集群有数百台机器,在每个节点上安装运行环境工作量大且出错概率高;

处理过程

Spark 2.1.0最高支持的Python版本是Python 3.5(),不能使用已有的3.6,PySpark的API不兼容。

下载Python 3.5源码,指定路径编译安装

./configure --prefix='/usr/local/python3.5' --enable-optimizations && make && make install

Python装好后安装Jupyter内核和第三方依赖库

curl https://bootstrap.pypa.io/3.5/get-pip.py -o get-pip.py
/usr/local/python3.5/bin/python3.5 get-pip.py
/usr/local/python3.5/bin/pip3 install ipykernel

打包依赖,上传的分布式文件系统

zip python35.zip -rq /usr/local/python3.5
hadoop fs -put python35.zip /tmp

编写测试脚本

import random
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('PI').enableHiveSupport().getOrCreate()
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = spark.sparkContext.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print('PI:',pi)

提交Spark任务,验证是否正常

bin/spark-submit --name pyspark --num-executors 4  \
--master yarn --deploy-mode client \
--conf spark.pyspark.python=./python35/python3.5/bin/python3 \
--conf spark.pyspark.driver.python=/usr/local/python3.5/bin/python3.5 \
--conf spark.yarn.dist.archives=hdfs://hdfsCluster/tmp/python35.zip#python35 \
test.py

注意这里PYSPARK_PYTHON变量中python35是hdfs上zip包解压后的路径。若上述命令能看到输出PI的结果,则Spark已可用。

现有平台的Notebook是定制过的,使用pyspark-shell的内核。

可看到,pyspark-shell其实是在启动时初始化了一些环境,如spark,sql,sc等,

这里增加创建Session的参数即可

spark = SparkSession.builder\
.enableHiveSupport()\
.config('spark.yarn.dist.archives','hdfs://hdfsCluster/tmp/python35.zip#python35')\ 
.config('spark.submit.deployMode','client')\
.getOrCreate()

修改内核文件kernel.json,指定环境变量和Python路径

{
	"display_name": "pySpark",
	"language": "python",
	"argv": [
		"/usr/local/python3.5/bin/python3",
		"-m",
		"ipykernel_launcher",
		"-f",
		"{connection_file}"
	],
	"env": {
		"PYSPARK_PYTHON": "./python35/python3.5/bin/python3",
		"PYSPARK_DRIVER_PYTHON": "/usr/local/python3.5/bin/python3",
	}
}

重启Notebook生效

总结思考

上面通过archive的方式解决了运行环境分发的问题,避免了在每个worker节点机械部署的操作。

  • archive的方式也有不足,即计算时每个Executor均需要从分布式文件系统下载环境,增加了网络IO等开销,开启动态资源伸缩功能时消耗更多;当然总体上还是利大于弊。

  • 这里的archive其实是制作运行时的过程,类似容器技术的镜像,自然地可以想到用Docker的方式来处理。在没有历史包袱的场景,该问题用K8s可以轻松得到解决,并且Jupyter多租、Spark调度均有很好的支持。

  • 从解耦角度上,Jupyter使用标准的Python内核会更合适,PySpark仅作为Spark的Client库,交由用户创建Session,也减轻了平台的负担。比如这个场景中,若Python库有更新需要打包,则所有用户都会感受到,多数时候不一定是好事。