【Flink-API】之复习窗口Window

一、Flink时间

1.1 概念

【Flink-API】之复习窗口Window1.EventTime 时间创建的时间,时间戳描述。
2.Ingestion Time 数据进入到Flink的时间
3.Processing Time 是每一个执行操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time

二、Window简介

2.1 Streaming

这是一种无界的数据流,不断增长的数据流。

2.2 window

window 是将一个无线的stream逻辑上拆分成有限大小的 bucket 筒,进行操作。

三、Window类型

3.1 CountWindow-固定条数窗口

按照指定的数据条数生成一个window,与时间无关

3.1.1 CountWindow.java

public class CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200"8888);

        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });

        //不分组 整体是一个组
        //输入5条后计算
        AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(5);

        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();

        env.execute();

    }
}

程序执行结果:【Flink-API】之复习窗口Window

3.1.2 CountWindowGroup.java

public class CountWindowGroup {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //spark,3
        //hadoop,2
        DataStream<String> lines = env.socketTextStream("192.168.52.200"8888);

        //分组后在进行划分
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                String word = fields[0];
                Integer count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });

        //1.先分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

        //2.划分窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> window = keyed.countWindow(5);

        //3.所有分组达到了条数才会执行 【5条数据全部拿到了,可以进行各种计算】
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1);

        summed.print();

        env.execute();

    }
}

执行结果下:【Flink-API】之复习窗口Window

3.2 Tumbing Window-滚动窗口

【Flink-API】之复习窗口Window特点:特订的步长,比如5S滑动一次。适合做BI统计等等。

3.2.1 TumblingWindowAll.java

public class TumblingWindowAll {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200"8888);

        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });

        //不分组 5s 中聚合一次
        AllWindowedStream<Integer, TimeWindow> window = nums.timeWindowAll(Time.seconds(5));

        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();

        env.execute();

    }
}

3.2.2 TumblingWindowGroup.java

public class TumblingWindowGroup {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //spark,3
        //hadoop,2
        DataStream<String> lines = env.socketTextStream("192.168.52.200"8888);

        //分组后在进行划分
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                String word = fields[0];
                Integer count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });

        //先分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

        //划分窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.timeWindow(Time.seconds(5));

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1);
        summed.print();
        env.execute();
    }
}

3.3 Sliging Window-滑动窗口

【Flink-API】之复习窗口Window滑动窗口的大小可以与步长不等大小。窗口固定长度,有重叠。时间对齐。特点:算趋势。

3.3.1 SlidingWindow.java

public class SlidingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200"8888);

        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });

        //不分组 整体是一个组
        //窗口的长度为10s 5s中滑动一次
        AllWindowedStream<Integer, TimeWindow> window = nums.timeWindowAll(Time.seconds(10), Time.seconds(5));

        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();
        env.execute();
    }
}

执行结果【Flink-API】之复习窗口Window

3.4 Session Window-会话窗口

【Flink-API】之复习窗口Window按照指定的时间间隔划分一个窗口。

四、WaterMark

1.它是window延迟触发的机制
2.watermark >= 上一个窗口的结束边界就会触发窗口执行
3.watermark = 数据锁携带的时间【窗口的最大时间】- 延迟执行的时间


原文始发于微信公众号(Coding路人王):【Flink-API】之复习窗口Window

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41736.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!