Bootstrap

workflow 之 Luigi 3.x 基本用法(qbit)

前言

  • 软件版本

Python 3.8
poetry 1.1.7
luigi  3.0.3

安装

  • 用 初始化项目后在 添加以下依赖,然后运行

# 国内镜像源(可选)
[[tool.poetry.source]]
name = "aliyun"
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true

[tool.poetry.dependencies]
python = "^3.8"
luigi = "~3.0.3"

配置文件

  • 设置环境变量

# Linux
export LUIGI_CONFIG_PATH=my_path/luigi.toml
export LUIGI_CONFIG_PARSER=toml
# Windows(注意行尾不要有空格)
set LUIGI_CONFIG_PATH=my_path/luigi.toml
set LUIGI_CONFIG_PARSER=toml
  • 编辑配置文件 ,全部配置项参见

[core]
default_scheduler_host = '192.168.1.101'
default_scheduler_port = 8082
default_scheduler_url = 'http://192.168.1.101:8082/'

[scheduler]
# luigid 终止后保存的状态文件
state_path = 'my_path/state.pickle'
# 任务终止后多久移除(默认 10 分钟)
remove_delay = 3600

测试代码

# encoding: utf-8
# author: qbit
# date: 2022-01-10
# summary: 测试 luigi,加减乘除

import luigi

class TaskAdd(luigi.Task):
    r""" 读入参数做加法 """
    x = luigi.IntParameter()
    y = luigi.IntParameter()

    def output(self):
        return luigi.LocalTarget(f'add_{self.x}_{self.y}.result')

    def run(self):
        with self.output().open('w') as f:
            f.write(str(self.x + self.y))

class TaskSubtract(luigi.Task):
    r""" 把加法的结果 -1 """
    def requires(self):
        return TaskAdd(2, 1)

    def output(self):
        return luigi.LocalTarget('subtract.result')

    def run(self):
        with self.input().open('r') as infile:
            num = int(infile.read().strip())

        with self.output().open('w') as outfile:
            outfile.write(str(num-1))

class TaskMultiply(luigi.Task):
    r""" 把加法的结果乘以 2 """
    def requires(self):
        return TaskAdd(2, 1)

    def output(self):
        return luigi.LocalTarget('multiply.result')

    def run(self):
        with self.input().open('r') as infile:
            num = int(infile.read().strip())

        with self.output().open('w') as outfile:
            outfile.write(str(num*2))

class TaskDivide(luigi.Task):
    r""" 用乘法的结果除以减法的结果 """
    def requires(self):
        return {'x': TaskSubtract(), 'y': TaskMultiply()}

    def output(self):
        return luigi.LocalTarget('divide.result')

    def run(self):
        with self.input()['x'].open('r') as infile:
            x = int(infile.read().strip())

        with self.input()['y'].open('r') as infile:
            y = int(infile.read().strip())

        with self.output().open('w') as outfile:
            outfile.write(str(y / x))

if __name__ == '__main__':
    luigi.build([TaskDivide()], local_scheduler=False)

运行

  • 运行后台 scheduler

poetry run luigid
  • 运行任务

poetry run python .\luigi_test.py
  • 访问 查看任务

本文出自