workflow 之 Dagster 基本用法(qbit)
前言
软件版本
Python 3.8
poetry 1.1.7
dagster 0.13.13
安装
用 初始化项目后在 添加以下依赖,然后运行
# 国内镜像源(可选)
[[tool.poetry.source]]
name = "aliyun"
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true
[tool.poetry.dependencies]
python = "^3.8"
dagster = "~0.13.13"
dagit = "~0.13.13"
测试代码
# encoding: utf-8
# author: qbit
# date: 2022-01-11
# summary: 测试 dagster,加减乘除
from dagster import get_dagster_logger, job, op
loggerDag = get_dagster_logger()
@op
def OpSeed():
r""" 种子函数,生成初始参数 """
return (2, 1)
@op
def OpAdd(seed):
r""" 读入参数做加法 """
x, y = seed
result = x + y
loggerDag.info(f"{x} + {y} = {result}")
return result
@op
def OpSubtract(x):
r""" 读入参数减 1 """
result = x - 1
loggerDag.info(f"{x} - 1 = {result}")
return result
@op
def OpMultiply(x):
r""" 读入参数乘以 2 """
result = x * 2
loggerDag.info(f"{x} * 2 = {result}")
return result
@op
def OpDivide(x, y):
r""" 读入参数做除法 """
result = y / x
loggerDag.info(f"{y} / {x} = {result}")
return result
@job
def arithmetic():
r""" 四则运算 """
seed = OpSeed()
addResult = OpAdd(seed)
subResult = OpSubtract(addResult)
mulResult = OpMultiply(addResult)
OpDivide(subResult, mulResult)
if __name__ == "__main__":
result = arithmetic.execute_in_process(run_config={"loggers": {"console": {"config": {"log_level": "info"}}}})
直接运行
直接使用 运行
poetry run python test_dagster.py
运行结果
2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpAdd - 2 + 1 = 3
2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpMultiply - 3 * 2 = 6
2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpSubtract - 3 - 1 = 2
2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpDivide - 6 / 2 = 3.0
用 运行
编辑配置文件
loggers:
console:
config:
log_level: INFO
运行命令
poetry run dagster job execute -f test_dagster.py -c run_config.yaml
运行结果
To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.
2022-01-11 16:35:25 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpAdd - 2 + 1 = 3
2022-01-11 16:35:30 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpSubtract - 3 - 1 = 2
2022-01-11 16:35:30 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpMultiply - 3 * 2 = 6
2022-01-11 16:35:35 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpDivide - 6 / 2 = 3.0
用 运行
运行命令
poetry run dagit -f test_dagster.py
用浏览器打开 查看

点击 标签页,点击 按钮运行,运行结果如下:
