大家好我是栗子鑫,今天主要想给大家介绍一下 FlinkAPI 的一种 DataStream,希望对大家有所帮助,正文如下:
上一篇公众号介绍了三种 API 的 ProcessFunction,今天主要想介绍一下三种 API 里面的 DataStream。
image-20220825211143498
Flink 官网介绍:DataStream 是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。
这里主要介绍 DataStream API 的两个主要功能:Streams 和 windows
01
—
Streams
Streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集。流式计算可以理解为数据流在传递的过程中“被处理”了,至于如何被处理就是 DataStream API 的功能了,这里大致介绍几个,配上案例方便大家理解:
-
Map 类似 java,python 的 map 函数一样,对每一个元素进行处理
DataStreamSource<Integer> rowData = env.fromElements(1, 2, 3, 4, 5);
rowData.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
//平方
return integer * integer;
}
}).print("mapAPI");
env.execute();结果:mapAPI> 1 mapAPI> 4 mapAPI> 9 mapAPI> 16 mapAPI> 25
-
flatmap 将一个数据通过自定义的规则“转化为”多个或 0 个数据,保留一个句子中包含字母 o 的单词。
DataStreamSource<String> rowData4flatmap = env.fromElements("Thank you for following six chestnuts");
rowData4flatmap.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
for (String word : s.split(" ")){
if(word.contains("o")){
collector.collect(word);
}
}
}
}).print("flatmapAPI"); -
Filter 也像 java,python 的 filter 函数一样,过滤掉某些不满足条件的数据, 过滤掉长度小于 6 的单词
DataStreamSource<String> rowData4flatmap = env.fromElements("Thank you for following six chestnuts");
rowData4flatmap.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(word);
}
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.length() >=6;
}
}).print("FilterAPI");FilterAPI> following
FilterAPI> chestnuts
代码说明:借用上一个 🌰,先进行 flatmap 将句子拆分为一个一个的单词,然后对单词过滤,只保留长度大于等于 6 的单词。
stream 的处理上述还有很多,如 KeyBy、Rolling Aggregation 算子、Split 和 Select 的等,这里不在一一介绍,如果读者感兴趣可以去官网查阅具体细节。
02
—
Windows
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制。所谓窗口(window),就是把一个无限的 stream 拆分成有限大小的buckets
桶,我们可以在这些桶上做计算操作。
Window 可以分成两类:
CountWindow:按照指定的数据条数生成一个 Window,与时间无关。比如每 50 个数据统计一次。
TimeWindow:按照时间生成 Window。比如每隔几分钟分钟统计一次。
对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(TumblingWindow)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
今天主要讲一下基于时间生成 Window:
-
滚动窗口(Tumbling Windows)
滚动窗口是按固定的时间段或长度(比如小时或元素个数)来分片 stream 中的元素。
image-20220827221319900
按照时间来进行窗口划分,每次窗口的滑动距离等于窗口的长度,这样数据不会重复计算。
官网伪代码:
/ tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
//Time.milliseconds(x)、Time.seconds(x)、 Time.minutes(x)等之一指定时间间隔
-
滑动窗口(Sliding Window)
滑动窗口是滚动窗口的更广义的一种形式,滑动窗口由固定的窗口大小和滑动间隔组成。如果滑动周期小于窗口大小,那么窗口会发生部分重叠,在这种情况下元素会被分配到多个窗口中。而如果滑动周期跟窗口大小相等,则该窗口就是滚动窗口。这里借用官网的一张图来解释[1]:
image-20220828203753884
Window size 表示滑动窗口的大小,window slide 为一次滑动的距离,如果 size<slide 的时候会出现数据丢失,反之当 size > slide 的时候,会出现数据属于多个窗口,例如有大小为 10 分钟的窗口滑动 5 分钟。这样,每 5 分钟就会获得一个窗口,其中包含在最后 10 分钟内到达的事件。
伪代码:
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
-
会话窗口 (Session Window)
会话窗口分配器按活动会话对元素进行分组。与翻滚窗口和滑动窗口相比,会话窗口不重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时,即当出现不活动间隙时,会话窗口将关闭。会话窗口分配器可以配置有静态会话间隙或 会话间隙提取器功能,该功能定义不活动的时间长度。当此期限到期时,当前会话关闭,后续元素被分配到新的会话窗口。
image-20220828211447938 会话窗口特点:时间无对齐。session 间隔(session gap)必须基于时间,没有基于个数的会话窗口。
伪代码:
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
03
—
总结
上述笔者简单介绍了 Datastream 的 API,主要介绍 window 相关的,笔者在现实场景中并没有涉及到,所以都是使用的是官网的伪代码,后期会更加详细的介绍窗口函数并会更新在笔者的github[2]。后期更新后会第一时间通知。希望对你们能有所帮助。
关注六只栗子,面试不迷路!
04
—
参考资料
图来解释: https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/windows/
[2]
github: https://github.com/zhangxinfang520/FlinkStudy.git
作者 栗子鑫
编辑 一口栗子
原文始发于微信公众号(六只栗子):浅谈 Flink(三)之 DataStream
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/88330.html