workflow 之 Prefect 基本用法(qbit)
前言
软件版本
Python 3.8
poetry 1.1.7
prefect 0.15.12
安装
用 初始化项目后在 添加以下依赖,然后运行
# 国内镜像源(可选)
[[tool.poetry.source]]
name = "aliyun"
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true
[tool.poetry.dependencies]
python = "^3.8"
prefect = "~0.15.12"
# Linux
export PREFECT__FLOWS__CHECKPOINTING=true
# Windows powershell
$env:PREFECT__FLOWS__CHECKPOINTING="true"
# Windows cmd(注意行尾不要有空格)
set PREFECT__FLOWS__CHECKPOINTING=true
测试代码
# encoding: utf-8
# author: qbit
# date: 2022-01-12
# summary: 测试 prefect,加减乘除
import os
import sys
import shutil
import prefect
from prefect import task, Flow
from prefect.engine.results import LocalResult
logger = prefect.context.get("logger")
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
cur_filename = os.path.basename(__file__)
dirname = f".{os.path.splitext(cur_filename)[0]}" # 以当前 py 文件名作为缓存目录名
PrefectLocalResultDir = os.path.join(cur_dir_fullpath, dirname)
def ClearDirectory(dir):
r""" 清空目录 """
for filename in os.listdir(dir):
file = os.path.join(dir, filename)
try:
if os.path.isfile(file) or os.path.islink(file):
os.remove(file)
elif os.path.isdir(file):
shutil.rmtree(file)
except Exception as e:
print(f'Failed to delete{file}. Reason: {e}')
@task(target="{task_name}.target", checkpoint=True, result=LocalResult(dir=PrefectLocalResultDir))
def TaskAdd(x, y):
result = x + y
logger.info(f"{x} + {y} = {result}")
return result
@task
def TaskSubtract(x):
r""" 读入参数减 1 """
result = x - 1
logger.info(f"{x} - 1 = {result}")
return result
@task
def TaskMultiply(x):
r""" 读入参数乘以 2 """
result = x * 2
logger.info(f"{x} * 2 = {result}")
print(f"result: {result}")
return result
@task(log_stdout=True)
def TaskDivide(x, y):
r""" 读入参数做除法 """
result = y / x
logger.info(f"{y} / {x} = {result}")
return result
if __name__ == '__main__':
if (len(sys.argv) > 1) and (sys.argv[1] == "restart"):
print(f"****** Clear {PrefectLocalResultDir} ...")
ClearDirectory(PrefectLocalResultDir)
with Flow("示例: 四则运算") as flow:
addResult = TaskAdd(2, 1)
subResult = TaskSubtract(addResult)
mulResult = TaskMultiply(addResult)
TaskDivide(subResult, mulResult)
flow_state = flow.run()
运行
第一次运行(注意第一个计算结果的 是 )
# 运行命令
poetry run python ./test_prefect.py
# 结果输出
[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 2 + 1 = 3
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
第二次运行(注意第一个计算结果的 是 )
# 运行命令
poetry run python ./test_prefect.py
# 结果输出
[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Cached'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
第三次运行(注意第一个计算结果的 是 ached)
# 运行命令
poetry run python ./test_prefect.py
# 结果输出
****** Clear D:\Python3Project\test_prefect\.test_prefect ...
[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 2 + 1 = 3
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
静态 DAG 图
[tool.poetry.dependencies]
python = "^3.8"
prefect = { version = "~0.15.12", extras = ["viz"] }
修改 的主函数
if __name__ == '__main__':
if (len(sys.argv) > 1) and (sys.argv[1] == "restart"):
print(f"****** Clear {PrefectLocalResultDir} ...")
ClearDirectory(PrefectLocalResultDir)
with Flow("示例: 四则运算") as flow:
addResult = TaskAdd(2, 1)
subResult = TaskSubtract(addResult)
mulResult = TaskMultiply(addResult)
TaskDivide(subResult, mulResult)
flow.visualize(filename='flow_start', format='png')
flow_state = flow.run()
flow.visualize(flow_state=flow_state, filename='flow_end', format='png')
运行代码会生成 两张图片
poetry run python ./test_prefect.py restart


