一、Flink容错
1.1 State状态
Flink实时计算为了保证计算过程中,出现异常可以容错,就要中间结果的计算结果存储起来,这些中间数据就叫做State。State时多类型的,默认是保存在JobManger的内存中,也可以保存在TaskManager的本地文件中,也可以保存在本地文件系统或者HDFS这样的分布式文件系统中。
1.2 StateBackend
用来保存State的存储后端就叫做StateBackend,默认是保存在JobManger的内存中,也可以保存在TaskManager的本地文件中,也可以保存在本地文件系统或者HDFS这样的分布式文件系统中。程序:
public class StateBackend01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 只有开启了checkpoint,5s才会有重启策略,固定时间,无限重启
* 默认把中间结果保存于JobMananger的内存
*/
env.enableCheckpointing(5000);
//自定义重启固定次数,和重启时间
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000));
//本地目录:设置状态存储的后端,只是当前的job,建议在配置文件中全局配置
//env.setStateBackend(new FsStateBackend("file://D:\APP\IDEA\workplace\FlinkTurbineFaultDiagnosis\checkpoint"));
//HFDS:存储chenckpoint
System.setProperty("HADOOP_USER_NAME", "root");
env.setStateBackend(new FsStateBackend("hdfs://hadoop1:9000/checkpoint01"));
/**
* 程序异常退出,或者人为取消,不删除checkpoint目录数据
*/
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);
SingleOutputStreamOperator<String> wangyining = lines.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
if (line.startsWith("wangyining")) {
throw new RuntimeException("老王的程序挂了!");
}
return line.toUpperCase();
}
});
wangyining.print();
env.execute();
}
}
1.3 CheckPointing
Flink为了实时容错,可以量中间结果定期的保存期起来,这种定期触发保存中间结果的机制叫做CheckPointing. CheckPointing 是周期执行的.具体的过程是JobManager定期的向TaskManager中的SubTask发送RPC消息,SubTask 将其计算的State保存StateBackEnd 中,并晌JobManager相应Checkpoint是否成功。如果程序出现异常或重启TaskManager 中的SubTask可以从上-一次成功的CheckPointing的State恢复。
1.4 重启策略
Flink实时计算程序,为了容错,需要开启CheckPointing,- – 旦开启CheckPointing,如果没有重启策略,默认的重启策略是无限重启,可以也可以设置其他重启策略,如:重启固定次数且可以延迟执行的策略。
public class RestartStrategies01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 只有开启了checkpoint,如 5s 才会有重启策略,固定时间,无限重启
* 默认把中间结果保存于JobMananger的内存
*/
env.enableCheckpointing(5000);
//自定义重启固定次数,和重启时间
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000));
DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);
SingleOutputStreamOperator<String> wangyining = lines.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
if (line.startsWith("wangyining")) {
throw new RuntimeException("老王的程序挂了!");
}
return line.toUpperCase();
}
});
wangyining.print();
env.execute();
}
}
原文始发于微信公众号(Coding路人王):【Flink-容错API】重启策略-CheckPoint/StateBackend
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41759.html