Bootstrap

workflow 之 Prefect 基本用法(qbit)

前言

  • 软件版本

Python  3.8
poetry  1.1.7
prefect 0.15.12

安装

  • 用 初始化项目后在 添加以下依赖,然后运行

# 国内镜像源(可选)
[[tool.poetry.source]]
name = "aliyun"
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true

[tool.poetry.dependencies]
python = "^3.8"
prefect = "~0.15.12"
# Linux
export PREFECT__FLOWS__CHECKPOINTING=true
# Windows powershell
$env:PREFECT__FLOWS__CHECKPOINTING="true"
# Windows cmd(注意行尾不要有空格)
set PREFECT__FLOWS__CHECKPOINTING=true

测试代码

# encoding: utf-8
# author: qbit
# date: 2022-01-12
# summary: 测试 prefect,加减乘除

import os
import sys
import shutil
import prefect
from prefect import task, Flow
from prefect.engine.results import LocalResult

logger = prefect.context.get("logger")
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
cur_filename = os.path.basename(__file__)
dirname = f".{os.path.splitext(cur_filename)[0]}"       # 以当前 py 文件名作为缓存目录名
PrefectLocalResultDir = os.path.join(cur_dir_fullpath, dirname)

def ClearDirectory(dir):
    r""" 清空目录 """
    for filename in os.listdir(dir):
        file = os.path.join(dir, filename)
        try:
            if os.path.isfile(file) or os.path.islink(file):
                os.remove(file)
            elif os.path.isdir(file):
                shutil.rmtree(file)
        except Exception as e:
            print(f'Failed to delete{file}. Reason: {e}')

@task(target="{task_name}.target", checkpoint=True, result=LocalResult(dir=PrefectLocalResultDir))
def TaskAdd(x, y):
    result = x + y
    logger.info(f"{x} + {y} = {result}")
    return result

@task
def TaskSubtract(x):
    r""" 读入参数减 1 """
    result = x - 1
    logger.info(f"{x} - 1 = {result}")
    return result

@task
def TaskMultiply(x):
    r""" 读入参数乘以 2 """
    result = x * 2
    logger.info(f"{x} * 2 = {result}")
    print(f"result: {result}")
    return result

@task(log_stdout=True)
def TaskDivide(x, y):
    r""" 读入参数做除法 """
    result = y / x
    logger.info(f"{y} / {x} = {result}")
    return result

if __name__ == '__main__':
    if (len(sys.argv) > 1) and (sys.argv[1] == "restart"):
        print(f"****** Clear {PrefectLocalResultDir} ...")
        ClearDirectory(PrefectLocalResultDir)

    with Flow("示例: 四则运算") as flow:
        addResult = TaskAdd(2, 1)
        subResult = TaskSubtract(addResult)
        mulResult = TaskMultiply(addResult)
        TaskDivide(subResult, mulResult)

    flow_state = flow.run()

运行

  • 第一次运行(注意第一个计算结果的 是 )

# 运行命令
poetry run python ./test_prefect.py
# 结果输出
[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 2 + 1 = 3
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:14:56+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:14:56+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:14:56+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
  • 第二次运行(注意第一个计算结果的 是 )

# 运行命令
poetry run python ./test_prefect.py
# 结果输出
[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Cached'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:21:12+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:21:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:21:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
  • 第三次运行(注意第一个计算结果的 是 ached)

# 运行命令
poetry run python ./test_prefect.py
# 结果输出
****** Clear D:\Python3Project\test_prefect\.test_prefect ...
[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 2 + 1 = 3
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 3 * 2 = 6
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 3 - 1 = 2
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run...
[2022-01-13 14:24:12+0800] INFO - prefect | 6 / 2 = 3.0
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | result: 3.0
[2022-01-13 14:24:12+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success'
[2022-01-13 14:24:12+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

静态 DAG 图

  • 官方文档:

  • 下载  并配置到 环境变量

  • 修改 pyproject.toml,添加 extra,然后运行

[tool.poetry.dependencies]
python = "^3.8"
prefect = { version = "~0.15.12", extras = ["viz"] }
  • 修改 的主函数

if __name__ == '__main__':
    if (len(sys.argv) > 1) and (sys.argv[1] == "restart"):
        print(f"****** Clear {PrefectLocalResultDir} ...")
        ClearDirectory(PrefectLocalResultDir)

    with Flow("示例: 四则运算") as flow:
        addResult = TaskAdd(2, 1)
        subResult = TaskSubtract(addResult)
        mulResult = TaskMultiply(addResult)
        TaskDivide(subResult, mulResult)

    flow.visualize(filename='flow_start', format='png')
    flow_state = flow.run()
    flow.visualize(flow_state=flow_state, filename='flow_end', format='png')
  • 运行代码会生成 两张图片

poetry run python ./test_prefect.py restart

  • 颜色代表的状态:

本文出自