
Kestra 是一个通用的开源编排器,旨在使定期计划和事件驱动的工作流程变得简单。
Kestra 通过用户界面提供了简便的工作流创建方式,使用声明性的 YAML 界面来定义编排逻辑,使业务利益相关者能够参与工作流创建过程。此外,Kestra 提供了多用途、与编程语言无关的开发工具,并为业务专业人员提供了直观的用户界面。无论通过用户界面、CI/CD、Terraform 还是 API 调用以其他方式修改工作流组件,Kestra 都能自动调整 YAML 定义,因此编排逻辑始终以声明性代码的形式进行管理。

丰富的编排能力
Kestra 提供了多种任务来处理简单和复杂的业务逻辑,包括:
-
• 子流程
-
• 重试
-
• 超时处理
-
• 错误处理
-
• 条件分支
-
• 动态任务
-
• 顺序和并行任务
-
• 根据需要设置
disabled
标志,以跳过任务或触发器 -
• 配置任务、流程和触发器之间的依赖关系
-
• 高级调度和触发条件
-
• 回填
-
• 蓝图
-
• 通过添加 Markdown 描述来记录流程、任务和触发器的详细信息
-
• 添加标签以为流程添加额外的元数据,如流程所有者或团队:
id: getting_started
namespace: dev
description: |
# Getting Started
Let's `write` some **markdown** - [first flow](https://t.ly/Vemr0) 🚀
labels:
owner: rick.astley
project: never-gonna-give-you-up
tasks:
- id: hello
type: io.kestra.core.tasks.log.Log
message: Hello world!
description: a *very* important task
disabled: false
timeout: PT10M
retry:
type: constant # type: string
interval: PT15M # type: Duration
maxDuration: PT1H # type: Duration
maxAttempt: 5 # type: int
warningOnRetry: true # type: boolean, default is false
- id: parallel
type: io.kestra.core.tasks.flows.Parallel
concurrent: 3
tasks:
- id: task1
type: io.kestra.plugin.scripts.shell.Commands
commands:
- 'echo "running {{task.id}}"'
- 'sleep 2'
- id: task2
type: io.kestra.plugin.scripts.shell.Commands
commands:
- 'echo "running {{task.id}}"'
- 'sleep 1'
- id: task3
type: io.kestra.plugin.scripts.shell.Commands
commands:
- 'echo "running {{task.id}}"'
- 'sleep 3'
triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "*/15 * * * *"
backfill:
start: 2023-10-05T14:00:00Z
内置代码编辑器
编写工作流程时,UI 提供:
-
• 自动完成
-
• 语法验证
-
• 嵌入式插件文档
-
• 作为蓝图提供的示例流程
-
• 拓扑视图(有向无环图中的依赖关系视图)会在修改和添加新任务时实时更新
安装
环境准备
-
• Docker
-
• Docker Compose
运行 Kestra
下载 Docker Compose 文件:
curl -o docker-compose.yml https://raw.githubusercontent.com/kestra-io/kestra/develop/docker-compose.yml
或者,可以使用:
wget https://raw.githubusercontent.com/kestra-io/kestra/develop/docker-compose.yml.
启动 Kestra:
docker-compose up
在浏览器中打开 http://localhost:8080
启动 UI 即可创建流程。
示例
创建流程
在屏幕左侧,单击 Flows 菜单。然后,单击 Create 按钮。

将以下代码粘贴到流程编辑器:
id: getting_started
namespace: dev
tasks:
- id: api
type: io.kestra.plugin.fs.http.Request
uri: https://dummyjson.com/products
然后,点击 Save 按钮。

此流程有一个任务,将从 dummyjson API 获取数据。

添加输入参数
在流程中可以灵活地引入不同的数值,使工作流更易于适应不同情况的变化。
-
• 与任务类似,输入是键值对的列表。每个输入必须有
name
和type
。还可添加defaults
默认值。 -
• 使用
{{ inputs.input_name }}
语法来引用流程中的输入值。
id: getting_started
namespace: dev
inputs:
- name: api_url
type: STRING
defaults: https://dummyjson.com/products
tasks:
- id: api
type: io.kestra.plugin.fs.http.Request
uri: "{{ inputs.api_url }}"
检索输出
工作流中的任务可以产生一些结果或数据,这些结果或数据可以传递给后续的任务。
-
• 结果可以是一些变量(数据)或者是存储在内部存储系统中的文件。
-
• 使用
{{ outputs.task_id.output_property }}
语法检索任务的特定输出。
使用该 {{ outputs.api.body }}
语法在下游任务中处理获取的数据,如下面的 Python 脚本任务所示。
id: getting_started
namespace: dev
inputs:
- name: api_url
type: STRING
defaults: https://dummyjson.com/products
tasks:
- id: api
type: io.kestra.plugin.fs.http.Request
uri: "{{ inputs.api_url }}"
- id: python
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:slim
beforeCommands:
- pip install polars
warningOnStdErr: false
script: |
import polars as pl
data = {{outputs.api.body | jq('.products') | first}}
df = pl.from_dicts(data)
df.glimpse()
df.select(["brand", "price"]).write_csv("{{outputDir}}/products.csv")
此流程处理 Polars 中的数据并将结果存储为 CSV 文件。
求值表达式
在 Outputs 选项卡上使用内置的表达式评估器来测试类似于 {{outputs.api.body | jq('.products') | first}}
这样的输出解析表达式,以确保它们按预期工作。

在任务之间传递数据
向流程添加另一个任务来处理 Python 脚本任务生成的 CSV 文件。使用该 io.kestra.plugin.jdbc.duckdb.Query
任务对 CSV 文件运行 SQL 查询,并将结果作为可下载的工件存储在内部存储中。
id: getting_started
namespace: dev
tasks:
- id: api
type: io.kestra.plugin.fs.http.Request
uri: https://dummyjson.com/products
- id: python
type: io.kestra.plugin.scripts.python.Script
docker:
image: python:slim
beforeCommands:
- pip install polars
warningOnStdErr: false
script: |
import polars as pl
data = {{outputs.api.body | jq('.products') | first}}
df = pl.from_dicts(data)
df.glimpse()
df.select(["brand", "price"]).write_csv("{{outputDir}}/products.csv")
- id: sqlQuery
type: io.kestra.plugin.jdbc.duckdb.Query
inputFiles:
in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
sql: |
SELECT brand, round(avg(price), 2) as avg_price
FROM read_csv_auto('{{workingDir}}/in.csv', header=True)
GROUP BY brand
ORDER BY avg_price DESC;
store: true
此示例流程使用输出在任务之间传递数据。inputFiles
任务的参数允许 io.kestra.plugin.jdbc.duckdb.Query
将文件从内部存储传递到任务。确保 store: true
SQL 查询的结果存储在内部存储中,并且可以从 Outputs 选项卡预览和下载。

传送门
GitHub:https://github.com/kestra-io/kestra
原文始发于微信公众号(开源技术专栏):无限可扩展、事件驱动、与语言无关的流程编排平台
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/166530.html