Bootstrap

workflow 之 Dagster 基本用法(qbit)

前言

  • 软件版本

Python  3.8
poetry  1.1.7
dagster 0.13.13

安装

  • 用 初始化项目后在 添加以下依赖,然后运行

# 国内镜像源(可选)
[[tool.poetry.source]]
name = "aliyun"
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true

[tool.poetry.dependencies]
python = "^3.8"
dagster = "~0.13.13"
dagit = "~0.13.13"

测试代码

# encoding: utf-8
# author: qbit
# date: 2022-01-11
# summary: 测试 dagster,加减乘除
from dagster import get_dagster_logger, job, op

loggerDag = get_dagster_logger()

@op
def OpSeed():
    r""" 种子函数,生成初始参数 """
    return (2, 1)

@op
def OpAdd(seed):
    r""" 读入参数做加法 """
    x, y = seed
    result = x + y
    loggerDag.info(f"{x} + {y} = {result}")
    return result

@op
def OpSubtract(x):
    r""" 读入参数减 1 """
    result = x - 1
    loggerDag.info(f"{x} - 1 = {result}")
    return result

@op
def OpMultiply(x):
    r""" 读入参数乘以 2 """
    result = x * 2
    loggerDag.info(f"{x} * 2 = {result}")
    return result

@op
def OpDivide(x, y):
    r""" 读入参数做除法 """
    result = y / x
    loggerDag.info(f"{y} / {x} = {result}")
    return result

@job
def arithmetic():
    r""" 四则运算 """
    seed = OpSeed()
    addResult = OpAdd(seed)
    subResult = OpSubtract(addResult)
    mulResult = OpMultiply(addResult)
    OpDivide(subResult, mulResult)

if __name__ == "__main__":
    result = arithmetic.execute_in_process(run_config={"loggers": {"console": {"config": {"log_level": "info"}}}})

直接运行

  • 直接使用 运行

poetry run python test_dagster.py
  • 运行结果

2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpAdd - 2 + 1 = 3
2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpMultiply - 3 * 2 = 6
2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpSubtract - 3 - 1 = 2
2022-01-11 16:29:48 +0800 - dagster - INFO - arithmetic - be4946d9-c8ec-4b4a-bebb-a09d47d1b231 - OpDivide - 6 / 2 = 3.0

用 运行

  • 编辑配置文件

loggers:
  console:
    config:
      log_level: INFO
  • 运行命令

poetry run dagster job execute -f test_dagster.py -c run_config.yaml
  • 运行结果

To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.

2022-01-11 16:35:25 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpAdd - 2 + 1 = 3
2022-01-11 16:35:30 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpSubtract - 3 - 1 = 2
2022-01-11 16:35:30 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpMultiply - 3 * 2 = 6
2022-01-11 16:35:35 +0800 - dagster - INFO - arithmetic - b14bd796-f0dc-4f79-91a8-6d8a8689b105 - OpDivide - 6 / 2 = 3.0

用 运行

  • 运行命令

poetry run dagit -f test_dagster.py
  • 用浏览器打开 查看

  • 点击 标签页,点击 按钮运行,运行结果如下:

出自