flink版本1.12.5
1. 时态表
看下官网的定义
时态表(Temporal Table)代表基于表的(参数化)视图概念,该表记录变更历史,该视图返回表在某个特定时间点的内容。
“时”代表一个时间点,那“态”就代表着这一时刻表的状态,那时态表可以简单认为:这一时刻查询这张表的结果(或是快照)。举个例子:mysql表中11点更新了一条数据,那11点前后查询的结果是不同的,所以时态表(Temporal Table)是一张随时间变化的表。
Flink 使用 SQL:2011 中提出的 FOR SYSTEM_TIME AS OF 的 SQL 语法查询时态表。目前,时态表只能在 join 中使用。
接下来我们看一个join的例子
2. join的例子
CREATE TABLE `send_table01` (
`id` INTEGER,
`address` STRING,
`cloud_wise_proc_time` AS `proctime`(),
`cloud_wise_event_time` TIMESTAMP(3),
WATERMARK FOR `cloud_wise_event_time` AS `cloud_wise_event_time` - INTERVAL '1' MINUTE
) WITH (
'connector' = 'kafka',
'topic' = 'topic_source_004',
'properties.bootstrap.servers' = '10.0.16.44:18108',
'properties.max.poll.records' = '5000',
'properties.group.id' = 'wrwrwrw',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE `send_table02` (
`name` STRING,
`id` INTEGER,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'driver' = 'ru.yandex.clickhouse.ClickHouseDriver',
'url' = 'jdbc:clickhouse://10.101.1.234:18100/docp?socket_timeout=900000',
'username' = 'default'
'table-name' = 'pop'
);
CREATE TABLE `out_table` (
`name` STRING NOT NULL
) WITH (
'connector' = 'kafka',
'topic' = 'topic_out_60',
'properties.bootstrap.servers' = '10.0.16.44:18108',
'properties.acks' = '1',
'properties.retries' = '0',
'properties.batch.size' = '16348',
'properties.linger.ms' = '0',
'properties.buffer.memory' = '33554432',
'json.ignore-parse-errors' = 'true',
'format' = 'json'
);
INSERT INTO `out_table`
(SELECT `b`.`name` AS `name`
FROM `send_table01` AS `a`
INNER JOIN `send_table02` FOR SYSTEM_TIME AS OF `a`.`cloud_wise_proc_time` AS `b` ON `a`.`id` = `b`.`id`);
注意:
INNER JOIN `send_table02` FOR SYSTEM_TIME AS OF `a`.`cloud_wise_proc_time` AS `b`
cloud_wise_proc_time字段要为处理时间,对于上述例子,当flink消费kafka的数据然后通过关联字段查询mysql时,flink 将cloud_wise_proc_time作为时态表的”时”去查询。也就是:kafka来一条数据的时刻作为查询mysql(时态表)的“时”。
如果选择kafka数据中事件时间作为时态表的“时”,假设数据中有事件时间为去年某刻,此时flink可能会报错,因为去年这张mysql表还不存在。
所以,针对于这样的维表join场景,当flink报相关错误时,可以考虑是否是时间类型用错了
Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found.
org.apache.flink.table.api.ValidationException: Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found.
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:285) ~[flink-table-blink_2.11-1.12.5.jar:1.12.5]
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) ~[flink-table-blink_2.11-1.12.5.jar:1.12.5]
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) ~[flink-table-blink_2.11-1.12.5.jar:1.12.5]
at org.apache.calcite.plan.hep.HepPlanner.applyRules(
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/65407.html