阿拉平平
读完需要
分钟
速读仅需 4 分钟
在处理复杂工作时,将所有的逻辑都写到一个任务中是一种很糟糕的做法。将其拆解成多个子任务,重新编排并监控运行状况则要靠谱的多。也许你正在寻找一个好用的工作流引擎,那么这款基于 Python 的工作流工具:Prefect[1] 说不定可以帮助到你。
在这篇文章中,我将介绍并演示 Prefect 的用法,编写一个简单的工作流程序来说明 Prefect 是如何使用的。文中使用的 Python 版本为 3.6.5,Prefect 版本为 0.13.19。
1. 快速开始
安装很简单,执行以下命令:
pip install prefect
官方的示例代码如下:
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def say_hello(name):
print("Hello, {}!".format(name))
with Flow("My First Flow") as flow:
name = Parameter('name')
say_hello(name)
flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"
我们运行看下输出结果:
INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
INFO - prefect.TaskRunner | Task 'name': Starting task run...
INFO - prefect.TaskRunner | Task 'name': Finished task run for task with final state: 'Success'
INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
INFO - prefect.TaskRunner | Hello, world!
INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
INFO - prefect.TaskRunner | Task 'name': Starting task run...
INFO - prefect.TaskRunner | Task 'name': Finished task run for task with final state: 'Success'
INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
INFO - prefect.TaskRunner | Hello, Marvin!
INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
2. 项目实践
现在有这么个需求:获取 GitHub Trending 每日数据,并保存成 CSV 文件。这个要怎么实现呢?
download_data:调用接口,获取 GitHub Trending 每日数据。
handle_data:对数据进行处理,选取需要的字段。
save_data:将处理好的数据保存成 CSV 文件。
2.1 功能实现
首先是 download_data。通过 requests 库,这个不难实现,具体代码如下:
import requests
GITHUB_TRENDING_URL = "https://trendings.herokuapp.com/repo"
def download_data():
params = {'since': 'today'}
trending_data = requests.get(GITHUB_TRENDING_URL, params).json()
return trending_data
我将接口返回的数据转成了 JSON 格式,数据如下:
{
"count": 24,
"msg": "suc",
"items": [
{
"repo": "getmeli/meli",
"repo_link": "[https://github.com/getmeli/meli](https://github.com/getmeli/meli)",
"desc": "",
"lang": "TypeScript",
"stars": "951",
"forks": "18",
"added_stars": "291 stars today",
"avatars": [
"[https://avatars3.githubusercontent.com/u/32174276?s=40&v=4](https://avatars3.githubusercontent.com/u/32174276?s=40&v=4)",
"[https://avatars3.githubusercontent.com/u/13135149?s=40&v=4](https://avatars3.githubusercontent.com/u/13135149?s=40&v=4)"
]
},
... many more records
]
}
接下来是实现 handle_data。由于只需要 items 中的内容,所以对其进行处理,代码如下:
def handle_data(data):
return [i for i in data["items"]]
最后是 save_data。选取 items 中的字段,保存到本地,代码如下:
import csv
def save_data(rows):
headers = ["repo", "repo_link", "stars", "forks", "added_stars"]
with open("/tmp/trending.csv", "w", newline="") as f:
f_csv = csv.DictWriter(f, headers, extrasaction='ignore')
f_csv.writeheader()
f_csv.writerows(rows)
2.2 工作流
from prefect import Flow
with Flow("GitHub_Trending_Flow") as flow:
data = download_data()
rows = handle_data(data)
save_data(rows)
flow.run()
2.3 任务
最简单的方法是使用装饰器 @task,比如将 download_data 声明为任务:
from prefect import task
import requests
GITHUB_TRENDING_URL = "https://trendings.herokuapp.com/repo"
@task
def download_data():
params = {'since': 'daily'}
trending_data = requests.get(GITHUB_TRENDING_URL, params).json()
return trending_data
2.4 参数
导入 Parameter,并将 since 作为参数传入,具体代码如下:
from prefect import task, Flow, Parameter
import requests
GITHUB_TRENDING_URL = "https://trendings.herokuapp.com/repo"
@task
def download_data(since):
params = {'since': since}
trending_data = requests.get(GITHUB_TRENDING_URL, params).json()
return trending_data
with Flow("GitHub_Trending_Flow") as flow:
since = Parameter("since")
download_data(since)
flow.run(since="weekly")
2.5 工作流编排
如果是第一次启动,需要运行以下命令配置本地工作流:
prefect backend server
运行后会在 ~/.prefect 目录下生成配置文件,之后运行以下命令启动 server:
prefect server start

prefect agent local start
接下来需要创建项目,可以通过命令行创建项目:
prefect create project "GitHub_Trending"
项目创建后,加入以下代码可以将工作流注册到 server 中,这里的 project_name 要和刚创建的项目名对应:
flow.register(project_name="GitHub_Trending")

接着试下从页面运行工作流,不过别忘了指定参数的值:


3. 写在最后
对这个项目有兴趣的小伙伴也可以读下这篇文章[3],作者编写了个统计疫情数据并上传至 S3 的工作流程序,并在 GitHub 上开源了,很不错的一篇文章。
#!/usr/bin/env python3
from prefect import task, Flow, Parameter
import requests
import csv
GITHUB_TRENDING_URL = "https://trendings.herokuapp.com/repo"
@task
def download_data(since):
params = {'since': since}
trending_data = requests.get(GITHUB_TRENDING_URL, params).json()
return trending_data
@task
def handle_data(data):
return [i for i in data["items"]]
@task
def save_data(rows):
headers = ["repo", "repo_link", "stars", "forks", "added_stars"]
with open("/tmp/trending.csv", "w", newline="") as f:
f_csv = csv.DictWriter(f, headers, extrasaction='ignore')
f_csv.writeheader()
f_csv.writerows(rows)
with Flow("GitHub_Trending_Flow") as flow:
since = Parameter("since")
data = download_data(since)
rows = handle_data(data)
save_data(rows)
#flow.run(since="weekly")
flow.register(project_name="GitHub_Trending")
References
[1]
Prefect: https://github.com/PrefectHQ/prefect
[2]
文档: https://docs.prefect.io/core/
[3]
文章: https://makeitnew.io/prefect-a-modern-python-native-data-workflow-engine-7ece02ceb396
原文始发于微信公众号(阿拉平平):Prefect 折腾手记:编写一个简易工作流程序
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/287873.html