Prefect 是一个用于数据密集型工作流程编排的工具。它能够最简单地将任何 Python 函数转变成一个可被观察和编排的工作单元。通过 Prefect 可以构建弹性的、动态的工作流程,使其能够对周围的环境做出反应,并能从意外变化中恢复过来。通过添加装饰器,Prefect 提供了自动重试、分布式执行、调度、缓存等诸多增强功能。每个任务都能被追踪,并可通过 Prefect 服务器或 Prefect Cloud 仪表板进行监控。
from prefect import flow, task
from typing import List
import httpx
@task(retries=3)
def get_stars(repo: str):
url = f"https://api.github.com/repos/{repo}"
count = httpx.get(url).json()["stargazers_count"]
print(f"{repo} has {count} stars!")
@flow(name="GitHub Stars")
def github_stars(repos: List[str]):
for repo in repos:
get_stars(repo)
# run the flow!
github_stars(["PrefectHQ/Prefect"])
运行一些流程后,打开 Prefect UI 查看:
prefect server start
这是将任何 Python 函数转换为可观察和编排的工作单元的最简单方法。
快速入门
第 1 步:安装
pip install -U prefect
第 2 步:连接 Prefect 的 API
注册永久免费的 Prefect Cloud 帐户,或者使用 Prefect Cloud 的部分功能托管个人服务器。
-
如果使用 Prefect Cloud,请使用现有帐户登录或在 https://app.prefect.cloud/
创建新帐户。 -
如果设置新帐户,请为帐户创建一个工作区。 -
使用 prefect cloud login
Prefect CLI 命令从你的环境登录 Prefect Cloud 。
prefect cloud login
第 3 步:创建流
开始使用 Prefect 的最快方法是@flow
向任何 python 函数添加装饰器。以下是一个示例流程Repo Info
,其中包含两个任务:
# my_flow.py
import httpx
from prefect import flow, task
@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
""" Get info about a repo - will retry twice after failing """
url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
api_response = httpx.get(url)
api_response.raise_for_status()
repo_info = api_response.json()
return repo_info
@task
def get_contributors(repo_info: dict):
contributors_url = repo_info["contributors_url"]
response = httpx.get(contributors_url)
response.raise_for_status()
contributors = response.json()
return contributors
@flow(name="Repo Info", log_prints=True)
def repo_info(
repo_owner: str = "PrefectHQ", repo_name: str = "prefect"
):
# call our `get_repo_info` task
repo_info = get_repo_info(repo_owner, repo_name)
print(f"Stars 🌠 : {repo_info['stargazers_count']}")
# call our `get_contributors` task,
# passing in the upstream result
contributors = get_contributors(repo_info)
print(
f"Number of contributors 👷: {len(contributors)}"
)
if __name__ == "__main__":
# Call a flow function for a local flow run!
repo_info()
第 4 步:在本地运行流程
调用使用@flow
装饰器装饰的任何函数以查看流程运行的本地实例。
python my_flow.py
18:21:40.235 | INFO | prefect.engine - Created flow run 'fine-gorilla' for flow 'Repo Info'
18:21:40.237 | INFO | Flow run 'fine-gorilla' - View at https://app.prefect.cloud/account/0ff44498-d380-4d7b-bd68-9b52da03823f/workspace/c859e5b6-1539-4c77-81e0-444c2ddcaafe/flow-runs/flow-run/c5a85193-69ea-4577-aa0e-e11adbf1f659
18:21:40.837 | INFO | Flow run 'fine-gorilla' - Created task run 'get_repo_info-0' for task 'get_repo_info'
18:21:40.838 | INFO | Flow run 'fine-gorilla' - Executing 'get_repo_info-0' immediately...
18:21:41.468 | INFO | Task run 'get_repo_info-0' - Finished in state Completed()
18:21:41.477 | INFO | Flow run 'fine-gorilla' - Stars 🌠 : 12340
18:21:41.606 | INFO | Flow run 'fine-gorilla' - Created task run 'get_contributors-0' for task 'get_contributors'
18:21:41.607 | INFO | Flow run 'fine-gorilla' - Executing 'get_contributors-0' immediately...
18:21:42.225 | INFO | Task run 'get_contributors-0' - Finished in state Completed()
18:21:42.232 | INFO | Flow run 'fine-gorilla' - Number of contributors 👷: 30
18:21:42.383 | INFO | Flow run 'fine-gorilla' - Finished in state Completed('All states completed.')
流程日志顶部的链接将引导至运行页面。本地流程运行执行非常适合开发和测试,但要
安排流程运行
或根据事件触发
,则需要部署流程。
第 5 步:部署流程
注意:始终从存储库的根级别运行
prefect deploy
命令!
当运行deploy
命令时,Prefect 将自动检测存储库中定义的任何流。选择想要部署的一个。然后,按照向导
为部署命名,添加可选计划,创建工作池,可选配置远程流代码存储等等!
prefect deploy
保存部署的配置将生成一个
prefect.yaml
用你的第一个部署填充的文件。可以使用此 YAML 文件来编辑和定义此存储库的多个部署。
第 6 步:启动工作程序并运行已部署的流程
在新终端中运行:
prefect worker start --pool '<work-pool-name>'
工作线程运行后,可以从 UI 或通过运行以下命令来启动已部署的流程运行:
prefect deployment run '<flow-name>/<deployment-name>'
从 UI 中的流程运行
页面或工作日志中查看此流程运行的日志。至此已经成功部署流程运行!
传送门
开源协议:Apache2.0
开源地址:https://github.com/PrefectHQ/prefect
项目合集:https://github.com/RepositorySheet
-END-
原文始发于微信公众号(开源技术专栏):将任何 Python 函数转变成一个可被观察和编排的流程
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/166767.html