一、环境
1.centos7.6
2.zookeeper-3.4.5
3.kafka_2.11-0.10.2.1
4.jdk1.8_261
二、单机-ECS-zookeeper
tar -zxvf zookeeper-3.4.5 -C /root/apps/
cp zoo_simple.cfg zoo.cfg
提前创建好数据和目录日志文件夹
dataDir=/root/data/zookeeper
dataLogDir=/root/data/zookeeperlog
三、单机-ECS-kafka
3.1 安装
tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/
3.2 修改server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
host.name=172.17.81.232 # 阿里云内网地址
advertised.host.name=47.94.39.202 # 阿里云外网地址
zookeeper.connect=localhost:2181 #zookeeper地址
3.3 启动本机测试
# 启动
bin/kafka-server-start.sh -daemon config/server.properties
# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hanyaoyao
# 开启生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hanyaoyao
# 开启消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic hanyaoyao --from-beginning
四、VMware单机
4.1 zookeeper单机
略
五、VMware集群
hadoop1 192.168.52.200
hadoop2 192.168.52.201
hadoop3 192.168.52.202
5.1 zookeeper集群
1.解压安装
tar -zxvf zookeeper-3.4.5 -C /root/apps/
2.进入conf目录
cp zoo_simple.cfg zoo.cfg
3.数据目录【提前创建好集群的三个目录】
dataDir=/root/zkdata
4.集群配置
server.1=192.168.52.201:2888:3888
server.2=192.168.52.202:2888:3888
server.3=192.168.52.200:2888:3888
5.集群分发
scp -r zookeeper/ hadoop2:$PWD
scp -r zookeeper/ hadoop3:$PWD
6.逐台启动
bin/zkStart.sh start
7.查看状态
bin/zkStart status
5.2 kafka集群
1.tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/
2.修改配置文件如下:
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://hadoop1:9092
host.name=hadoop1
log.dirs=/root/data/kafka
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181
3.分发集群
scp kafka_2.11-0.10.2.1/ hadoop2:$PWD
scp kafka_2.11-0.10.2.1/ hadoop3:$PWD
4.修改hadoop2,hadoop3的集群编号 vi server.properties
broker.id=1
broker.id=2
5.逐台启动测试
1.启动
bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-start.sh -daemon config/server.properties
2..创建topic
bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 2 --partitions 2 --topic superman
# Created topic "superman".
3.创建生产者
bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic superman
4.创建消费者
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic superman --from-beginning

六、Flink-Kafka
6.1 pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
<!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 --
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
6.2 Flink-KafkaSource
public class KafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//kafka配置
String topic = "superman";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定
prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("auto.offset.reset","earliest");
prop.setProperty("group.id","consumer3");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
DataStreamSource<String> lines = env.addSource(kafkaSource);
lines.print();
env.execute();
}
}
七、错误解决
在连接kafkasource时候,总是消费不到kafka中的数据,开始怀疑以下问题:
1.zookeeper集群,kafka集群消息不通
2.宿主机与虚拟机网络不通
3.flink版本与kafka版本jar冲突
4.windows防火墙问题
5.hosts文件的主机名没有配置
最后经过查文档和排除问题,终于得知了zookeeper在集群中的消息是以主机名发送的,所以需要配置主机名。
原文始发于微信公众号(Coding路人王):【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41719.html