Spark支持自定义Python环境
背景
现有大数据平台的Spark版本是2.1.0,Python版本2.7和3.6 ,通过PySpark+Jupyter方式提供服务。Python 2年岁久远,升级支持Python 3
遇到的问题
1.平台其他应用程序需要Python 2,升级不能影响已有的环境;
2.Spark集群有数百台机器,在每个节点上安装运行环境工作量大且出错概率高;
处理过程
Spark 2.1.0最高支持的Python版本是Python 3.5(),不能使用已有的3.6,PySpark的API不兼容。
下载Python 3.5源码,指定路径编译安装
./configure --prefix='/usr/local/python3.5' --enable-optimizations && make && make install
Python装好后安装Jupyter内核和第三方依赖库
curl https://bootstrap.pypa.io/3.5/get-pip.py -o get-pip.py
/usr/local/python3.5/bin/python3.5 get-pip.py
/usr/local/python3.5/bin/pip3 install ipykernel
打包依赖,上传的分布式文件系统
zip python35.zip -rq /usr/local/python3.5
hadoop fs -put python35.zip /tmp
编写测试脚本
import random
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('PI').enableHiveSupport().getOrCreate()
num_samples = 100000000
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
count = spark.sparkContext.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print('PI:',pi)
提交Spark任务,验证是否正常
bin/spark-submit --name pyspark --num-executors 4 \
--master yarn --deploy-mode client \
--conf spark.pyspark.python=./python35/python3.5/bin/python3 \
--conf spark.pyspark.driver.python=/usr/local/python3.5/bin/python3.5 \
--conf spark.yarn.dist.archives=hdfs://hdfsCluster/tmp/python35.zip#python35 \
test.py
注意这里PYSPARK_PYTHON变量中python35是hdfs上zip包解压后的路径。若上述命令能看到输出PI的结果,则Spark已可用。
现有平台的Notebook是定制过的,使用pyspark-shell的内核。
由可看到,pyspark-shell其实是在启动时初始化了一些环境,如spark,sql,sc等,
这里增加创建Session的参数即可
spark = SparkSession.builder\
.enableHiveSupport()\
.config('spark.yarn.dist.archives','hdfs://hdfsCluster/tmp/python35.zip#python35')\
.config('spark.submit.deployMode','client')\
.getOrCreate()
修改内核文件kernel.json,指定环境变量和Python路径
{
"display_name": "pySpark",
"language": "python",
"argv": [
"/usr/local/python3.5/bin/python3",
"-m",
"ipykernel_launcher",
"-f",
"{connection_file}"
],
"env": {
"PYSPARK_PYTHON": "./python35/python3.5/bin/python3",
"PYSPARK_DRIVER_PYTHON": "/usr/local/python3.5/bin/python3",
}
}
重启Notebook生效

总结思考
上面通过archive的方式解决了运行环境分发的问题,避免了在每个worker节点机械部署的操作。
archive的方式也有不足,即计算时每个Executor均需要从分布式文件系统下载环境,增加了网络IO等开销,开启动态资源伸缩功能时消耗更多;当然总体上还是利大于弊。
这里的archive其实是制作运行时的过程,类似容器技术的镜像,自然地可以想到用Docker的方式来处理。在没有历史包袱的场景,该问题用K8s可以轻松得到解决,并且Jupyter多租、Spark调度均有很好的支持。
从解耦角度上,Jupyter使用标准的Python内核会更合适,PySpark仅作为Spark的Client库,交由用户创建Session,也减轻了平台的负担。比如这个场景中,若Python库有更新需要打包,则所有用户都会感受到,多数时候不一定是好事。