大家好我是栗子鑫,今天主要想给大家介绍一下FlinkAPI的一种Table API和SQL,希望对大家有所帮助,正文如下:
上一篇公众号介绍了三种API的DataStream,今天主要想介绍一下三种API最后一种API:Table API和SQL。
image-20220904211844378
Table API和SQL虽然是两个模块,其实这两者都属于Flink的Tabel模块,所以笔者将两者放在一起介绍。首先总体介绍下两者:
-
Tabel API:是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,逻辑上非常直观和方便; -
SQL:作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手。
01
—
总体程序结构
所有用于批处理和流式处理的 Table API 和 SQL 程序都遵循相同的模式。以下代码示例显示了 Table API 和 SQL 程序的常见结构。
val tableEnv = ... // 创建表环境
// 创建表
tableEnv.connect(...).createTemporaryTable("table1")
// 注册输出表
tableEnv.connect(...).createTemporaryTable("outputTable")
// 使用 Table API query 创建表
val tapiResult = tableEnv.from("table1").select(...)
// 使用 SQL query 创建表
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
// 输出一张结果表到 TableSink,SQL查询的结果表也一样
TableResult tableResult = tapiResult.executeInsert("outputTable");
tableResult...
// 执行
tableEnv.execute("scala_job")可以近似地认为有这么几步:首先创建执行环境,然后定义source、transform和sink。
下面对上述代码分步解释
02
—
创建表环境
TableEnvironment
是Table API & SQL编程中的核心,其主要功能为:
1.向目录(Catalog)中注册Table
或者从中获取Table
2.执行 SQL 查询
3.注册自定义函数 (scalar、table 或 aggregation)
4.将 DataStream 或 DataSet 转换成 Table
在创建TableEnv的时候,可以多传入一个参数,可以用来配置TableEnvironment的一些特性。Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表
,例如,对它们进行 join 或 union 操作。
TableEnvironment 维护着一个由标识符(identifier)创建的表catalog
的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明,就会使用当前默认值。Table 可以是虚拟的比如视图也可以是常规的表 。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。表TABLES描述的是外部数据,例如文件、数据库表或者消息队列。
表又分为临时表和永久表:
临时表的生命周期与创建它的Flink的session的生命周期相关,这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。
永久表需要 catalog以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。
创建虚拟表
一个Table
API 对象对应VIEW
于 SQL 术语中的一个(虚拟表)。它封装了一个逻辑查询计划。它可以在目录中创建,如下所示:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);也可以
TABLE
从连接器[1]声明中创建关系数据库中已知的。tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
03
—
表查询
利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。Flink给我们提供了两种查询方式:Table API和 SQL。
Table API查询:
Table集成在Scala和Java语言内的查询API。与SQL不同,Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。
// 获取表环境
TableEnvironment tableEnv = ...;
// 注册表
Table orders = tableEnv.from("Orders");
//计算 revenue 对于来自FRANCE的顾客
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName")
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));
SQL查询:
Flink的SQL集成,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。
TableEnvironment tableEnv = ...;
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
04
—
DataStream/DataSet和表数据相互转化
Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream,先流式地读取数据源,然后map成样例类,再把它转成Table。Table的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义schema了。
这里内容比较多笔者不进行详细的介绍了,如果感兴趣的详细查看官网教程[2]。
05
—
写出表
Table
通过TableSink来写入。TableSink
是一个通用接口,支持多种文件格式(例如 CSV、Apache Parquet、Apache Avro)、存储系统(例如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息系统(例如 Apache Kafka、RabbitMQ )。批处理Table
只能写入 BatchTableSink
,而流式传输Table
需要AppendStreamTableSink
、 RetractStreamTableSink
或UpsertStreamTableSink
.
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// 创建输出表
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.BIGINT());
tableEnv.connect(new FileSystem().path("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.executeInsert("CsvSinkTable");
06
—
总结
上述笔者整体介绍了Table API 和SQL整个流程,具体细节并没有进行介绍,后期会更加详细的分段介绍。希望对你们能有所帮助。
关注六只栗子,面试不迷路!
参考资料
连接器: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connect.html
[2]
官网教程: https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#structure-of-table-api-and-sql-programs
作者 栗子鑫
编辑 一口栗子
原文始发于微信公众号(六只栗子):浅谈Flink(四)之Table API和SQL
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/88291.html