浅谈Flink(四)之Table API和SQL

浅谈Flink(四)之Table API和SQL

大家好我是栗子鑫,今天主要想给大家介绍一下FlinkAPI的一种Table API和SQL,希望对大家有所帮助,正文如下:

上一篇公众号介绍了三种API的DataStream,今天主要想介绍一下三种API最后一种API:Table API和SQL。

浅谈Flink(四)之Table API和SQL
image-20220904211844378

Table API和SQL虽然是两个模块,其实这两者都属于Flink的Tabel模块,所以笔者将两者放在一起介绍。首先总体介绍下两者:

  • Tabel API:是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,逻辑上非常直观和方便;
  • SQL:作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手。



01


总体程序结构



所有用于批处理和流式处理的 Table API 和 SQL 程序都遵循相同的模式。以下代码示例显示了 Table API 和 SQL 程序的常见结构。

val tableEnv = ... // 创建表环境
// 创建表
tableEnv.connect(...).createTemporaryTable("table1")
// 注册输出表
tableEnv.connect(...).createTemporaryTable("outputTable")
// 使用 Table API query 创建表
val tapiResult = tableEnv.from("table1").select(...)
// 使用 SQL query 创建表
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
// 输出一张结果表到 TableSink,SQL查询的结果表也一样
TableResult tableResult = tapiResult.executeInsert("outputTable");
tableResult...
// 执行
tableEnv.execute("scala_job")

可以近似地认为有这么几步:首先创建执行环境,然后定义source、transform和sink。

下面对上述代码分步解释



02


创建表环境



TableEnvironment是Table API & SQL编程中的核心,其主要功能为:

1.向目录(Catalog)中注册Table或者从中获取Table

2.执行 SQL 查询

3.注册自定义函数 (scalar、table 或 aggregation)

4.将 DataStream 或 DataSet 转换成 Table

在创建TableEnv的时候,可以多传入一个参数,可以用来配置TableEnvironment的一些特性。Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。

TableEnvironment 维护着一个由标识符(identifier)创建的表catalog的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明,就会使用当前默认值。Table 可以是虚拟的比如视图也可以是常规的表 。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。表TABLES描述的是外部数据,例如文件、数据库表或者消息队列。

表又分为临时表和永久表:

临时表的生命周期与创建它的Flink的session的生命周期相关,这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。

永久表需要 catalog以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。

创建虚拟表

一个TableAPI 对象对应VIEW于 SQL 术语中的一个(虚拟表)。它封装了一个逻辑查询计划。它可以在目录中创建,如下所示:

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query 
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);

也可以TABLE连接器[1]声明中创建关系数据库中已知的。

tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")



03


表查询



利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。Flink给我们提供了两种查询方式:Table API和 SQL。

Table API查询:

Table集成在Scala和Java语言内的查询API。与SQL不同,Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。

// 获取表环境
TableEnvironment tableEnv = ...; 
// 注册表
Table orders = tableEnv.from("Orders");
//计算 revenue 对于来自FRANCE的顾客
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName")
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));

SQL查询:

Flink的SQL集成,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。

TableEnvironment tableEnv = ...; 
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);



04


DataStream/DataSet和表数据相互转化



Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream,先流式地读取数据源,然后map成样例类,再把它转成Table。Table的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义schema了。

这里内容比较多笔者不进行详细的介绍了,如果感兴趣的详细查看官网教程[2]



05


写出表



Table通过TableSink来写入。TableSink是一个通用接口,支持多种文件格式(例如 CSV、Apache Parquet、Apache Avro)、存储系统(例如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息系统(例如 Apache Kafka、RabbitMQ )。批处理Table只能写入  BatchTableSink,而流式传输Table需要AppendStreamTableSinkRetractStreamTableSinkUpsertStreamTableSink.

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// 创建输出表
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.BIGINT());
tableEnv.connect(new FileSystem().path("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.executeInsert("CsvSinkTable");



06


总结



上述笔者整体介绍了Table API 和SQL整个流程,具体细节并没有进行介绍,后期会更加详细的分段介绍。希望对你们能有所帮助。

关注六只栗子,面试不迷路!

参考资料

[1]

连接器: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connect.html

[2]

官网教程: https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#structure-of-table-api-and-sql-programs

作者    栗子鑫

编辑   一口栗子  



浅谈Flink(四)之Table API和SQL

浅谈Flink(四)之Table API和SQL

浅谈Flink(四)之Table API和SQL

浅谈Flink(四)之Table API和SQL


原文始发于微信公众号(六只栗子):浅谈Flink(四)之Table API和SQL

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/88291.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!