【Flink-API】取消Kafka记录偏移量,flink管理偏移量

一、偏移量

1.1 取消kafka偏移量

默认是使用kafka来管理偏移量,下面我们来取消他的管理机制,使用flink来管理偏移量,开启flink的容错重启机制,还有中间数据state的保存机制。1.开启重启策略
2.stateBackend存储位置
3.取消任务checkpoint不删除   4.设置checkpoint的 EXACTLY_ONCE 模式
5.【重点】取消kafka管理偏移量,让flink来管理偏移量

1.2 kafka

创建topic如下,两个副本,四个分区

bin/kafka-topics.sh --create --zookeeper  hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 2 --partitions 4 --topic superman2

1.3 KafkaSourceV2.java

public class KafkaSourceV2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启重启策略
        env.enableCheckpointing(5000);
        // stateBackend
        env.setStateBackend(new FsStateBackend("file:///D://APP//IDEA//workplace//FlinkTurbineFaultDiagnosis//src//main//resources//checkpoint//chk001"));
        // 取消任务checkpoint不删除
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置checkpoint的 EXACTLY_ONCE 模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);


        //kafka配置
        String topic = "superman2";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","earliest");
        prop.setProperty("group.id","consumer3");

        // 【重点】取消kafka管理偏移量,让flink来管理偏移量
        prop.setProperty("enable.auto.commit","false");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);

        DataStreamSource<String> lines = env.addSource(kafkaSource);

        DataStreamSource<String> lines2 = env.socketTextStream("192.168.52.200"8888);
        // 模拟异常
        lines2.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                if (s.startsWith("wangyining")){
                    System.out.println(1/0);
                }
                return s;

            }
        }).print();

        lines.print();

        env.execute();

    }
}

以上程序中的偏移量保存到了StateBackend的中间数据目录,来进行checkpoint,不但把保存到了这里,还保存到了kafka中偏移量,只是用来监控
【Flink-API】取消Kafka记录偏移量,flink管理偏移量

1.4 启动测试

1.开启端口号 nc  -lk  8888 2.启动程序
3.启动kafka生产者,生产数据

bin/kafka-console-producer.sh --broker-list 192.168.52.200:9092,192.168.52.201:9092,192.168.52.202:9092 --topic superman2

【Flink-API】取消Kafka记录偏移量,flink管理偏移量4.模拟异常
if (s.startsWith(“wangyining”)){
System.out.println(1/0);
}
【Flink-API】取消Kafka记录偏移量,flink管理偏移量5.再次kafka输入命令,会发现已经重启成功,偏移量已经纪录成功。
【Flink-API】取消Kafka记录偏移量,flink管理偏移量

6.问题:程序出现了异常,恢复subTask,读取偏移量,从StateBackend恢复还是kafka中恢复?
答案:
6.1.StateBackend 偏移量目录,你会发现kafka中特殊的topic也有偏移量,但是它不是用来恢复的,是用来做监控的。
6.2.如果你不想让kafka中特殊的topic有偏移量,也可以取消。官网中这样记载。
【Flink-API】取消Kafka记录偏移量,flink管理偏移量6.3.只需要添加一下:

  // kafka中的偏移量取消掉【不建议false,默认是true,它用来监控这个偏移量】
        kafkaSource.setCommitOffsetsOnCheckpoints(false);

6.4.从kafka生产者中继续写数据的话,并没有从头读取,也不会从kafka中特殊的topic继续读取。7.如果停掉程序呢?7.1.如果开启了checkpointing,一定会保存到statebackend中去 7.2.没有指定savepoint的话,首先查看kafka特殊的topic,然后查看恢复文件。


原文始发于微信公众号(Coding路人王):【Flink-API】取消Kafka记录偏移量,flink管理偏移量

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41764.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!