无限可扩展、事件驱动、与语言无关的流程编排平台

无限可扩展、事件驱动、与语言无关的流程编排平台

Kestra 是一个通用的开源编排器,旨在使定期计划和事件驱动的工作流程变得简单。

Kestra 通过用户界面提供了简便的工作流创建方式,使用声明性的 YAML 界面来定义编排逻辑,使业务利益相关者能够参与工作流创建过程。此外,Kestra 提供了多用途、与编程语言无关的开发工具,并为业务专业人员提供了直观的用户界面。无论通过用户界面、CI/CD、Terraform 还是 API 调用以其他方式修改工作流组件,Kestra 都能自动调整 YAML 定义,因此编排逻辑始终以声明性代码的形式进行管理。

无限可扩展、事件驱动、与语言无关的流程编排平台

丰富的编排能力

Kestra 提供了多种任务来处理简单和复杂的业务逻辑,包括:

  • • 子流程

  • • 重试

  • • 超时处理

  • • 错误处理

  • • 条件分支

  • • 动态任务

  • • 顺序和并行任务

  • • 根据需要设置disabled标志,以跳过任务或触发器

  • • 配置任务、流程和触发器之间的依赖关系

  • • 高级调度和触发条件

  • • 回填

  • • 蓝图

  • • 通过添加 Markdown 描述来记录流程、任务和触发器的详细信息

  • • 添加标签以为流程添加额外的元数据,如流程所有者或团队:

id: getting_started
namespace: dev

description: |
  # Getting Started
  Let's `write` some **markdown** - [first flow](https://t.ly/Vemr0) 🚀

labels:
  owner: rick.astley
  project: never-gonna-give-you-up

tasks:
  - id: hello
    type: io.kestra.core.tasks.log.Log
    message: Hello world!
    description: a *very* important task
    disabled: false
    timeout: PT10M
    retry:
      type: constant # type: string
      interval: PT15M # type: Duration
      maxDuration: PT1H # type: Duration
      maxAttempt: 5 # type: int
      warningOnRetry: true # type: boolean, default is false

- id: parallel
    type: io.kestra.core.tasks.flows.Parallel
    concurrent: 3
    tasks:
      - id: task1
        type: io.kestra.plugin.scripts.shell.Commands
        commands:
          - 'echo "running {{task.id}}"'
          - 'sleep 2'
      - id: task2
        type: io.kestra.plugin.scripts.shell.Commands
        commands:
          - 'echo "running {{task.id}}"'
          - 'sleep 1'
      - id: task3
        type: io.kestra.plugin.scripts.shell.Commands
        commands:
          - 'echo "running {{task.id}}"'
          - 'sleep 3'

triggers:
  - id: schedule
    type: io.kestra.core.models.triggers.types.Schedule
    cron: "*/15 * * * *"
    backfill:
      start: 2023-10-05T14:00:00Z

内置代码编辑器

编写工作流程时,UI 提供:

  • • 自动完成

  • • 语法验证

  • • 嵌入式插件文档

  • • 作为蓝图提供的示例流程

  • • 拓扑视图(有向无环图中的依赖关系视图)会在修改和添加新任务时实时更新

安装

环境准备

  • • Docker

  • • Docker Compose

运行 Kestra

下载 Docker Compose 文件:

curl -o docker-compose.yml https://raw.githubusercontent.com/kestra-io/kestra/develop/docker-compose.yml

或者,可以使用:

wget https://raw.githubusercontent.com/kestra-io/kestra/develop/docker-compose.yml.

启动 Kestra:

docker-compose up

在浏览器中打开 http://localhost:8080 启动 UI 即可创建流程。

示例

创建流程

在屏幕左侧,单击 Flows 菜单。然后,单击 Create 按钮。

无限可扩展、事件驱动、与语言无关的流程编排平台

将以下代码粘贴到流程编辑器:

id: getting_started
namespace: dev

tasks:
  - id: api
    type: io.kestra.plugin.fs.http.Request
    uri: https://dummyjson.com/products

然后,点击 Save 按钮。

无限可扩展、事件驱动、与语言无关的流程编排平台

此流程有一个任务,将从 dummyjson API 获取数据。

无限可扩展、事件驱动、与语言无关的流程编排平台

添加输入参数

在流程中可以灵活地引入不同的数值,使工作流更易于适应不同情况的变化。

  • • 与任务类似,输入是键值对的列表。每个输入必须有 name 和 type。还可添加 defaults 默认值。

  • • 使用 {{ inputs.input_name }} 语法来引用流程中的输入值。

id: getting_started
namespace: dev

inputs:
  - name: api_url
    type: STRING
    defaults: https://dummyjson.com/products

tasks:
  - id: api
    type: io.kestra.plugin.fs.http.Request
    uri: "{{ inputs.api_url }}"

检索输出

工作流中的任务可以产生一些结果或数据,这些结果或数据可以传递给后续的任务。

  • • 结果可以是一些变量(数据)或者是存储在内部存储系统中的文件。

  • • 使用 {{ outputs.task_id.output_property }} 语法检索任务的特定输出。

使用该 {{ outputs.api.body }} 语法在下游任务中处理获取的数据,如下面的 Python 脚本任务所示。

id: getting_started
namespace: dev

inputs:
  - name: api_url
    type: STRING
    defaults: https://dummyjson.com/products

tasks:
  - id: api
    type: io.kestra.plugin.fs.http.Request
    uri: "{{ inputs.api_url }}"

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    docker:
      image: python:slim
    beforeCommands:
      - pip install polars
    warningOnStdErr: false
    script: |
      import polars as pl
      data = {{outputs.api.body | jq('.products') | first}}
      df = pl.from_dicts(data)
      df.glimpse()
      df.select(["brand", "price"]).write_csv("{{outputDir}}/products.csv")

此流程处理 Polars 中的数据并将结果存储为 CSV 文件。

求值表达式

在 Outputs 选项卡上使用内置的表达式评估器来测试类似于 {{outputs.api.body | jq('.products') | first}} 这样的输出解析表达式,以确保它们按预期工作。

无限可扩展、事件驱动、与语言无关的流程编排平台

在任务之间传递数据

向流程添加另一个任务来处理 Python 脚本任务生成的 CSV 文件。使用该 io.kestra.plugin.jdbc.duckdb.Query 任务对 CSV 文件运行 SQL 查询,并将结果作为可下载的工件存储在内部存储中。

id: getting_started
namespace: dev

tasks:
  - id: api
    type: io.kestra.plugin.fs.http.Request
    uri: https://dummyjson.com/products

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    docker:
      image: python:slim
    beforeCommands:
      - pip install polars
    warningOnStdErr: false
    script: |
      import polars as pl
      data = {{outputs.api.body | jq('.products') | first}}
      df = pl.from_dicts(data)
      df.glimpse()
      df.select(["brand", "price"]).write_csv("{{outputDir}}/products.csv")

  - id: sqlQuery
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
    sql: |
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_csv_auto('{{workingDir}}/in.csv', header=True)
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true

此示例流程使用输出在任务之间传递数据。inputFiles 任务的参数允许 io.kestra.plugin.jdbc.duckdb.Query 将文件从内部存储传递到任务。确保 store: true SQL 查询的结果存储在内部存储中,并且可以从 Outputs 选项卡预览和下载。

无限可扩展、事件驱动、与语言无关的流程编排平台

传送门

GitHubhttps://github.com/kestra-io/kestra


原文始发于微信公众号(开源技术专栏):无限可扩展、事件驱动、与语言无关的流程编排平台

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/166530.html

(1)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!