workflow 之 Luigi 3.x 基本用法(qbit)
前言
软件版本
Python 3.8
poetry 1.1.7
luigi 3.0.3
安装
用 初始化项目后在 添加以下依赖,然后运行
# 国内镜像源(可选)
[[tool.poetry.source]]
name = "aliyun"
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true
[tool.poetry.dependencies]
python = "^3.8"
luigi = "~3.0.3"
配置文件
设置环境变量
# Linux
export LUIGI_CONFIG_PATH=my_path/luigi.toml
export LUIGI_CONFIG_PARSER=toml
# Windows(注意行尾不要有空格)
set LUIGI_CONFIG_PATH=my_path/luigi.toml
set LUIGI_CONFIG_PARSER=toml
[core]
default_scheduler_host = '192.168.1.101'
default_scheduler_port = 8082
default_scheduler_url = 'http://192.168.1.101:8082/'
[scheduler]
# luigid 终止后保存的状态文件
state_path = 'my_path/state.pickle'
# 任务终止后多久移除(默认 10 分钟)
remove_delay = 3600
测试代码
# encoding: utf-8
# author: qbit
# date: 2022-01-10
# summary: 测试 luigi,加减乘除
import luigi
class TaskAdd(luigi.Task):
r""" 读入参数做加法 """
x = luigi.IntParameter()
y = luigi.IntParameter()
def output(self):
return luigi.LocalTarget(f'add_{self.x}_{self.y}.result')
def run(self):
with self.output().open('w') as f:
f.write(str(self.x + self.y))
class TaskSubtract(luigi.Task):
r""" 把加法的结果 -1 """
def requires(self):
return TaskAdd(2, 1)
def output(self):
return luigi.LocalTarget('subtract.result')
def run(self):
with self.input().open('r') as infile:
num = int(infile.read().strip())
with self.output().open('w') as outfile:
outfile.write(str(num-1))
class TaskMultiply(luigi.Task):
r""" 把加法的结果乘以 2 """
def requires(self):
return TaskAdd(2, 1)
def output(self):
return luigi.LocalTarget('multiply.result')
def run(self):
with self.input().open('r') as infile:
num = int(infile.read().strip())
with self.output().open('w') as outfile:
outfile.write(str(num*2))
class TaskDivide(luigi.Task):
r""" 用乘法的结果除以减法的结果 """
def requires(self):
return {'x': TaskSubtract(), 'y': TaskMultiply()}
def output(self):
return luigi.LocalTarget('divide.result')
def run(self):
with self.input()['x'].open('r') as infile:
x = int(infile.read().strip())
with self.input()['y'].open('r') as infile:
y = int(infile.read().strip())
with self.output().open('w') as outfile:
outfile.write(str(y / x))
if __name__ == '__main__':
luigi.build([TaskDivide()], local_scheduler=False)
运行
运行后台 scheduler
poetry run luigid
运行任务
poetry run python .\luigi_test.py
访问 查看任务
