一、Map
1.1 介绍
1.DataStream->DataStream 数据集转换。2.数据集合中的元素一一映射的关系。
1.2 MapFunction
public class Map01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// source
DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5);
// 实现一:map 方法做映射,输入输出一直的2倍
SingleOutputStreamOperator<Integer> result1 = nums.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * 2;
}
});
// 实现二:lambada 表达式的应用
SingleOutputStreamOperator<Integer> result2 = nums.map(i -> i * 2);
// sink
result1.print();
result2.print();
env.execute();
}
}
1.3 RichMapFunction
1.open()方法 在构造方法之后,map方法之前,只执行一次,初始化方法 使用数据库
2.Configuration为全局的配置
3.close()方法 销毁之前执行一次,通常为资源额的释放
程序如下:
// RichMapFunction
nums.map(new RichMapFunction<Integer, Integer>() {
// open 在构造方法之后,map方法之前,只执行一次,初始化方法 使用数据库
// Configuration为全局的配置
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public Integer map(Integer value) throws Exception {
return value * 10;
}
// close 销毁之前执行一次,通常为资源额的释放
@Override
public void close() throws Exception {
super.close();
}
});
二、FlatMap
2.1 介绍
输入一个元素,会被切分成多个元素。一对多。
2.2 FlatMapFunction
public class FlatMap01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.fromElements("GW001001 GW002002 GW003003", "GW001001 GW002002 GW003003", "GW001001 GW002002 GW003003");
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
//实现一:jdk8的流式处理 lambada表达式【推荐】
//Arrays.stream(line.split(" ")).forEach(out::collect);
//实现二:
//Arrays.asList(line.split(" ")).forEach(w -> out.collect(w));
//实现三:最原始的方式
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
words.print();
env.execute();
}
}
2.3 RichFlatMapFunction
雷同方法 open() close()方法。
三、Filter
3.1 介绍
1.ture是留下,false过滤
2.实现对输入的数据进行逻辑判断,判断是否是奇数?
3.2 FilterFunction
public class Filter01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//实现一:ture是留下,false时过滤
SingleOutputStreamOperator<Integer> filter1 = nums.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 != 0;
}
});
//实现二:lambada表达式:filter2
//SingleOutputStreamOperator<Integer> filter2 = nums.filter(i -> i >= 5);
SingleOutputStreamOperator<Integer> filter2 = nums.filter(i -> {
//换行要有return
return i >= 5;
});
filter1.print();
filter2.print();
env.execute();
}
}
四、KeyBy
实时计算的算子
4.1 lambda实现
1.实现输入一个,返回一个,使用lambada表达式 代替new Function,利用虚拟机开设一个socket端口号,实现实时聚合计算。2.虚拟机centos中:nc -lk 8888
3.元组也是一个特殊的集合,角标 0 开始 最大Tuple25
4.代码实现:
public class KeyBy01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
//使用lambada表达式 代替new Function
//输入一个返回一个
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(w -> Tuple2.of(w, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
//元组也是一个特殊的集合,角标 0 开始 最大Tuple25
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//聚合
keyed.print();
env.execute("KeyBy01");
}
}
4.2 KeyBy自定义实体类
1.实体类WordAndCount
public class WordAndCount {
private String word;
private Long counts;
}
2.keyby sum
public class KeyBy02 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8808);
//输入一个返回一个
SingleOutputStreamOperator<WordAndCount> wordAndOne = lines.map(new MapFunction<String, WordAndCount>() {
@Override
public WordAndCount map(String value) throws Exception {
return Turbine.of(value,1L);
}
});
//根据实体类的字段进行聚合
KeyedStream<WordAndCount, Tuple> keyed = wordAndOne.keyBy("word");
//聚合
SingleOutputStreamOperator<WordAndCount> sumed = keyed.sum("counts");
keyed.print();
sumed.print();
env.execute();
}
}
4.3 keyBy多字段进行分组
代码:
public class KeyBy03 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8828);
//山东,烟台,2000
//山东,烟台,2000
//山东,烟台,2000
SingleOutputStreamOperator<Tuple3<String, String, Double>> provinceCityMoney = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> map(String line) throws Exception {
//切分
String[] fields = line.split(",");
String province = fields[0];
String city = fields[1];
double money = Double.parseDouble(fields[2]);
return Tuple3.of(province, city, money);
}
});
//按照省份,城市分组 多个字段进行分组,最后一个字段进行聚合
/**
* 如果是自己定义的bean实体类,可以进行将字段写进去
*/
SingleOutputStreamOperator<Tuple3<String, String, Double>> summed = provinceCityMoney.keyBy(0, 1).sum(2);
summed.print();
env.execute();
}
}
五、Reduce
聚合,sum只可以加法,而reduce可以乘法,可以自定义算子。
public class Reduce01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8828);
//使用lambada表达式 代替new Function
//输入一个返回一个
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(w -> Tuple2.of(w, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
//元组也是一个特殊的集合,角标 0 开始 最大Tuple25
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
String key = value1.f0;
Integer count1 = value1.f1;
Integer count2 = value2.f1;
Integer counts = count1 + count2;
return Tuple2.of(key, counts);
}
});
reduced.print();
env.execute();
}
}
六、Max
例如:输入
spark,10
spark,20
hadoop,10
和历史数据比较,求最大次数的,最大的留下,最小的丢弃
public class Max01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] fields = line.split(",");
String words = fields[0];
int num = Integer.parseInt(fields[1]);
return Tuple2.of(words, num);
}
});
//按照单词进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//求最大次数的,最大的留下,最小的丢弃
SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyed.max(1);
max.print();
env.execute();
}
}
七、Sink
7.1 print()
打印到控制台也是sink.
7.2 CSV
public class AddSink01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] fields = line.split(",");
String words = fields[0];
int num = Integer.parseInt(fields[1]);
return Tuple2.of(words, num);
}
});
//按照单词进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyed.max(1);
/**
* 自定义sink,比如 写入数据库,磁盘等等
* 不需要有返回就可以
*/
max.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
System.out.println(value);
}
});
/**
* 写入磁盘
* 如果是写入scv文件 必须时tuple格式
*/
max.writeAsCsv("F:\out222", FileSystem.WriteMode.OVERWRITE);
max.print();
env.execute();
}
}
7.3 RedisSink
public class MyRedisSink extends RichSinkFunction<Turbine> {
//初始化redis连接
private transient Jedis jedis;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String host = params.getRequired("redis.host");
//String password = params.getRequired("redis.pwd");
int db = params.getInt("redis.db", 0);
jedis = new Jedis(host, 6379, 5000);
//jedis.auth(password);
jedis.select(db);
}
@Override
public void invoke(Turbine value, Context context) throws Exception {
if (!jedis.isConnected()) {
jedis.connect();
}
//写入redis
jedis.hset(value.word, value.province, String.valueOf(value.counts));
}
@Override
public void close() throws Exception {
super.close();
jedis.close();
}
}
7.4 MySqlSink
public class MySqlSink extends RichSinkFunction<GW200001> {
//最好连接不参与序列化
private transient Connection conn = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//创建mysql连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/turbine?characterEncoding=UTF-8", "root", "123456");
System.out.println("拿到连接了");
}
@Override
public void invoke(GW200001 gw200001, Context context) throws Exception {
//更新,插入,统计业务
PreparedStatement pstm = null;
try {
pstm = conn.prepareStatement("insert into gw200001(wt_number, wt_date_time) values(?,?)");
pstm.setString(1, gw200001.wt_number);
pstm.setString(2, gw200001.wt_date_time);
//执行sql executeUpdate() executeQuery()
System.out.println("执行sql");
pstm.execute();
} finally {
if (pstm != null) {
pstm.close();
System.out.println("正常关闭");
}
}
}
@Override
public void close() throws Exception {
super.close();
conn.close();
System.out.println("正常关闭了!");
}
}
原文始发于微信公众号(Coding路人王):【Flink-API】之复习一系列Transformation/Sink操作
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41725.html