将任何 Python 函数转变成一个可被观察和编排的流程

将任何 Python 函数转变成一个可被观察和编排的流程Prefect 是一个用于数据密集型工作流程编排的工具。它能够最简单地将任何 Python 函数转变成一个可被观察和编排的工作单元。通过 Prefect 可以构建弹性的、动态的工作流程,使其能够对周围的环境做出反应,并能从意外变化中恢复过来。通过添加装饰器,Prefect 提供了自动重试、分布式执行、调度、缓存等诸多增强功能。每个任务都能被追踪,并可通过 Prefect 服务器或 Prefect Cloud 仪表板进行监控。

from prefect import flow, task
from typing import List
import httpx


@task(retries=3)
def get_stars(repo: str):
    url = f"https://api.github.com/repos/{repo}"
    count = httpx.get(url).json()["stargazers_count"]
    print(f"{repo} has {count} stars!")


@flow(name="GitHub Stars")
def github_stars(repos: List[str]):
    for repo in repos:
        get_stars(repo)


# run the flow!
github_stars(["PrefectHQ/Prefect"])

运行一些流程后,打开 Prefect UI 查看:

prefect server start

这是将任何 Python 函数转换为可观察和编排的工作单元的最简单方法。将任何 Python 函数转变成一个可被观察和编排的流程

快速入门

第 1 步:安装

pip install -U prefect

第 2 步:连接 Prefect 的 API

注册永久免费的 Prefect Cloud 帐户,或者使用 Prefect Cloud 的部分功能托管个人服务器。

  1. 如果使用 Prefect Cloud,请使用现有帐户登录或在https://app.prefect.cloud/创建新帐户。
  2. 如果设置新帐户,请为帐户创建一个工作区。
  3. 使用prefect cloud login Prefect CLI 命令从你的环境登录 Prefect Cloud 。
prefect cloud login

第 3 步:创建流

开始使用 Prefect 的最快方法是@flow向任何 python 函数添加装饰器。以下是一个示例流程Repo Info,其中包含两个任务:

# my_flow.py
import httpx
from prefect import flow, task

@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
    """ Get info about a repo - will retry twice after failing """
    url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
    api_response = httpx.get(url)
    api_response.raise_for_status()
    repo_info = api_response.json()
    return repo_info


@task
def get_contributors(repo_info: dict):
    contributors_url = repo_info["contributors_url"]
    response = httpx.get(contributors_url)
    response.raise_for_status()
    contributors = response.json()
    return contributors

@flow(name="Repo Info", log_prints=True)
def repo_info(
    repo_owner: str = "PrefectHQ", repo_name: str = "prefect"
):
    # call our `get_repo_info` task
    repo_info = get_repo_info(repo_owner, repo_name)
    print(f"Stars 🌠 : {repo_info['stargazers_count']}")

    # call our `get_contributors` task,
    # passing in the upstream result
    contributors = get_contributors(repo_info)
    print(
        f"Number of contributors 👷: {len(contributors)}"
    )


if __name__ == "__main__":
    # Call a flow function for a local flow run!
    repo_info()

第 4 步:在本地运行流程

调用使用@flow装饰器装饰的任何函数以查看流程运行的本地实例。

python my_flow.py
18:21:40.235 | INFO    | prefect.engine - Created flow run 'fine-gorilla' for flow 'Repo Info'
18:21:40.237 | INFO    | Flow run 'fine-gorilla' - View at https://app.prefect.cloud/account/0ff44498-d380-4d7b-bd68-9b52da03823f/workspace/c859e5b6-1539-4c77-81e0-444c2ddcaafe/flow-runs/flow-run/c5a85193-69ea-4577-aa0e-e11adbf1f659
18:21:40.837 | INFO    | Flow run 'fine-gorilla' - Created task run 'get_repo_info-0' for task 'get_repo_info'
18:21:40.838 | INFO    | Flow run 'fine-gorilla' - Executing 'get_repo_info-0' immediately...
18:21:41.468 | INFO    | Task run 'get_repo_info-0' - Finished in state Completed()
18:21:41.477 | INFO    | Flow run 'fine-gorilla' - Stars 🌠 : 12340
18:21:41.606 | INFO    | Flow run 'fine-gorilla' - Created task run 'get_contributors-0' for task 'get_contributors'
18:21:41.607 | INFO    | Flow run 'fine-gorilla' - Executing 'get_contributors-0' immediately...
18:21:42.225 | INFO    | Task run 'get_contributors-0' - Finished in state Completed()
18:21:42.232 | INFO    | Flow run 'fine-gorilla' - Number of contributors 👷: 30
18:21:42.383 | INFO    | Flow run 'fine-gorilla' - Finished in state Completed('All states completed.')

流程日志顶部的链接将引导至运行页面。将任何 Python 函数转变成一个可被观察和编排的流程本地流程运行执行非常适合开发和测试,但要安排流程运行根据事件触发,则需要部署流程。

第 5 步:部署流程

注意:始终从存储库的根级别运行prefect deploy命令!

当运行deploy命令时,Prefect 将自动检测存储库中定义的任何流。选择想要部署的一个。然后,按照向导为部署命名,添加可选计划,创建工作池,可选配置远程流代码存储等等!

prefect deploy

保存部署的配置将生成一个prefect.yaml用你的第一个部署填充的文件。可以使用此 YAML 文件来编辑和定义此存储库的多个部署。

第 6 步:启动工作程序并运行已部署的流程

在新终端中运行:

prefect worker start --pool '<work-pool-name>'

工作线程运行后,可以从 UI 或通过运行以下命令来启动已部署的流程运行:

prefect deployment run '<flow-name>/<deployment-name>'

从 UI 中的流程运行页面或工作日志中查看此流程运行的日志。至此已经成功部署流程运行!

传送门

开源协议:Apache2.0

开源地址:https://github.com/PrefectHQ/prefect

项目合集:https://github.com/RepositorySheet

-END-


原文始发于微信公众号(开源技术专栏):将任何 Python 函数转变成一个可被观察和编排的流程

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/166767.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!