一、FlinkKafkaToRedis
思考一个问题:flink程序运行的时候,我们可以通过神魔样的形式进行传值?
1.写死程序传值
2.args[0],程序动态传值
ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);
DataStream<String> lines = FlinkUtils.createKafkaStream(parameters,SimpleStringSchema.class);
String groupId = parameters.get("group.id","consumer1");
String topics = parameters.getRequired("topics");
3.配置文件动态读取
1.1 pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
1.2 config.properties
# 可以传入多个topic,多个 ,隔开
topics=wang
group.id=consumer1
bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
# earliest
auto.offset.reset=latest
# kafka不提交偏移量,由flink管理checkpoint
enable.auto.commit=false
# 30s写入内存一次 默认是内存,由于我没有指定checkpoint目录,会保存与jobManager的内存中
# 你自己可以配置到 HDFS 例如:
# env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
# env.setStateBackend(new FsStateBackend("file:///D://APP//IDEA//workplace//FlinkTurbineFaultDiagnosis//checkpoint"));
checkpoint.interval=30000
# redis
redis.host=127.0.0.1
#redis.pwd=123456
redis.db=1
1.3 FlinkUtils.java
public class FlinkUtils {
private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static <T> DataStream<T> createKafkaStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception {
//1.设置全局的参数
env.getConfig().setGlobalJobParameters(parameters);
//2.checkpoint配置
env.enableCheckpointing(parameters.getLong("checkpoint.interval", 5000L), CheckpointingMode.EXACTLY_ONCE);
//3.取消checkpoint任务不删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//4.kafka配置
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", parameters.getRequired("bootstrap.servers"));
prop.setProperty("group.id", parameters.getRequired("group.id"));
prop.setProperty("auto.offset.reset", parameters.get("auto.offset.reset", "earliest"));
//5.不自动提交偏移量,交给flink的checkpoint处理哦
prop.setProperty("enable.auto.commit", parameters.get("enable.auto.commit", "false"));
String topics = parameters.getRequired("topics");
List<String> topicList = Arrays.asList(topics.split(","));
FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>(
topicList,
clazz.newInstance(),
prop);
return env.addSource(kafkaConsumer);
}
//获取执行环境
public static StreamExecutionEnvironment getEnv() {
return env;
}
}
1.4 MyRedisSink.java
public class MyRedisSink extends RichSinkFunction<Turbine> {
//初始化redis连接
private transient Jedis jedis;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String host = params.getRequired("redis.host");
//String password = params.getRequired("redis.pwd");
int db = params.getInt("redis.db", 0);
jedis = new Jedis(host, 6379, 5000);
//jedis.auth(password);
jedis.select(db);
}
@Override
public void invoke(Turbine value, Context context) throws Exception {
if (!jedis.isConnected()) {
jedis.connect();
}
//写入redis
jedis.hset(value.word, value.province, String.valueOf(value.counts));
}
@Override
public void close() throws Exception {
super.close();
jedis.close();
}
}
1.5 KafkaToRedis
public class KafkaToRedis {
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromPropertiesFile("D:\APP\IDEA\workplace\FlinkTurbineFaultDiagnosis\src\main\resources\config.properties");
DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class);
lines.print();
//输入的时String 返回一个对象
SingleOutputStreamOperator<Turbine> map = lines.map(new MapFunction<String, Turbine>() {
@Override
public Turbine map(String value) throws Exception {
String[] fields = value.split(" ");
String word = fields[0];
String province = fields[1];
long counts = Long.parseLong(fields[2]);
return Turbine.of(word, province, counts);
}
});
map.addSink(new MyRedisSink());
//执行程序
FlinkUtils.getEnv().execute();
}
}
原文始发于微信公众号(Coding路人王):【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41704.html