【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

一、环境

1.centos7.6
2.zookeeper-3.4.5
3.kafka_2.11-0.10.2.1
4.jdk1.8_261

二、单机-ECS-zookeeper

tar -zxvf zookeeper-3.4.5 -C /root/apps/
cp zoo_simple.cfg  zoo.cfg
提前创建好数据和目录日志文件夹
dataDir=/root/data/zookeeper
dataLogDir=/root/data/zookeeperlog

三、单机-ECS-kafka

3.1 安装

tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/

3.2 修改server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
host.name=172.17.81.232 # 阿里云内网地址
advertised.host.name=47.94.39.202 # 阿里云外网地址
zookeeper.connect=localhost:2181 #zookeeper地址

3.3 启动本机测试

# 启动
bin/kafka-server-start.sh -daemon config/server.properties 

# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hanyaoyao

# 开启生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hanyaoyao

# 开启消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic hanyaoyao --from-beginning

四、VMware单机

4.1 zookeeper单机

五、VMware集群

hadoop1 192.168.52.200  
hadoop2 192.168.52.201  
hadoop3 192.168.52.202

5.1 zookeeper集群

1.解压安装
tar -zxvf zookeeper-3.4.5 -C /root/apps/
2.进入conf目录
cp zoo_simple.cfg  zoo.cfg  
3.数据目录【提前创建好集群的三个目录】
dataDir=/root/zkdata

4.集群配置

server.1=192.168.52.201:2888:3888  
server.2=192.168.52.202:2888:3888  
server.3=192.168.52.200:2888:3888  

5.集群分发

scp -r zookeeper/ hadoop2:$PWD  
scp -r zookeeper/ hadoop3:$PWD  

6.逐台启动
bin/zkStart.sh start
7.查看状态
bin/zkStart status
【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

5.2 kafka集群

1.tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/
2.修改配置文件如下:

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://hadoop1:9092
host.name=hadoop1
log.dirs=/root/data/kafka
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

3.分发集群

scp  kafka_2.11-0.10.2.1/ hadoop2:$PWD
scp  kafka_2.11-0.10.2.1/ hadoop3:$PWD

4.修改hadoop2,hadoop3的集群编号 vi server.properties

broker.id=1
broker.id=2

5.逐台启动测试

1.启动
bin/kafka-server-start.sh -daemon config/server.properties  bin/kafka-server-start.sh -daemon config/server.properties

2..创建topic
bin/kafka-topics.sh --create --zookeeper  hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 2 --partitions 2 --topic superman
# Created topic "superman".

3.创建生产者
bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic superman

4.创建消费者
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic superman  --from-beginning

【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境
在这里插入图片描述

六、Flink-Kafka

6.1 pom

  <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
            <!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 --
            <!--<scope>provided</scope>-->
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
            
        </dependency>
                <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>

6.2 Flink-KafkaSource

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //kafka配置
        String topic = "superman";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","earliest");
        prop.setProperty("group.id","consumer3");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);

        DataStreamSource<String> lines = env.addSource(kafkaSource);
        lines.print();
        env.execute();
    }
}

七、错误解决

在连接kafkasource时候,总是消费不到kafka中的数据,开始怀疑以下问题:
1.zookeeper集群,kafka集群消息不通
2.宿主机与虚拟机网络不通
3.flink版本与kafka版本jar冲突
4.windows防火墙问题
5.hosts文件的主机名没有配置
最后经过查文档和排除问题,终于得知了zookeeper在集群中的消息是以主机名发送的,所以需要配置主机名。
【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境


原文始发于微信公众号(Coding路人王):【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41719.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!