SpringBoot集成Kafka – 用Multi-Consumer实现数据高吞吐

大家好,我是一安,之前一直介绍RabbitMQ,今天介绍一下实际开发中SpringBoot如何集成kafka实现高吞吐

简介

Apache Kafka是一个分布式发布 – 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。

Kafka适合离线和在线消息消费, Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。

Kafka构建在ZooKeeper同步服务之上,它与Apache Storm Spark非常好地集成,用于实时流式数据分析。

Kafka依赖于日志顺序写, 因此支持消息回溯和支撑高性能读写

基础概念

  • Produce:消息生产者,向消息系统发送消息的客户端
  • Consumer:消息消费者,从消息系统读取消息的客户端
  • Broker:服务代理节点(集群),用以处理消息。对于Kafka而言,Broker可以简单地看作一个独立的Kafka实例节点,Kafka集群由多个服务代理节点broker组成
  • Topic:主题,是一个逻辑概念,其作用类似于关系型数据库的表
  • Partition:分区,为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic分为多个partition(分布在不同的服务器上,具体在.log文件里),序号从0开始,每个partition是一个有序队列。partition中每条消息都会被分配一个有序的ID(offset)。Kafka只保证一个partition中的消息的有序性,不保证一个topic的多个partition间的顺序;
  • Offset:偏移量,partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息
  • Consumer group:消费者群组,一个topic可以有多个consumer,每个partition只会把消息发给消费者组中的一个consumer
  • Leader & Follower:Leader和Follower是针对于Partition提出的概念,Consumer和Producer的读写操作都是基于Leader分区的,其他Follwer提供对Leader的主从复制同步,用以故障切换。Leader和Follower可以存在于不同的broker中SpringBoot集成Kafka - 用Multi-Consumer实现数据高吞吐

场景

  • 低吞吐量:在默认情况,一个Topic只有一个Partition。Producer(s)将消息添加到该Partition的末尾,一个Consumer从该Partition中依序读走消息去处理

  • 高吞吐量:

    • 为了加快写入消息的速度,需要将一个Topic分成多个分区,Topic的分区是有状态的,增加分区后不可减少
    • 为了加快读取消息的速度,需要有多个Consumer同时来从分区中读取消息
    • 用Consumer Group来保证一个分区只能被一个Consumer读取

在一个Consumer Group内,一个Consumer可以读取多个分区,但是一个分区只能被一个Consumer读取

演示

环境搭建

小编为了方便就简单搭建一个集群环境,如果想了解详细搭建过程,后面小编会专门做一篇kafka的集群搭建过程,包括实现SASL认证

1.通过镜像创建容器并运行zookeeper
  docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

2.通过镜像创建容器并运行kafka0,kafka1,kafka2
  docker run -d --name kafka0 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.5.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.128:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

  docker run -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.5.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.128:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka

  docker run -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.5.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.128:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka

3.进入容器容器创建主题
  docker exec -it kafka0 /bin/bash
  cd /opt/kafka_2.13-2.8.1/bin
  kafka-topics.sh --create --zookeeper 192.168.5.128:2181 --replication-factor 1 --partitions 3 --topic test

4.查看新创建的topic信息
  kafka-topics.sh --describe --zookeeper 192.168.5.128:2181 --topic test
  
  Topic: test     TopicId: K8xJNqDjQfSVHPreTVNKNQ PartitionCount: 3       ReplicationFactor: 1    Configs:
  Topic: test     Partition: 0    Leader: 2       Replicas: 2     Isr: 2
  Topic: test     Partition: 1    Leader: 0       Replicas: 0     Isr: 0
  Topic: test     Partition: 2    Leader: 1       Replicas: 1     Isr: 1  

代码示例

1.引入依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.1.1</version>
    </dependency>

2.配置信息

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.5.128:9092,192.168.5.128:9093,192.168.5.128:9094
#=============== producer  =======================
server.port=8090
spring.kafka.producers.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.properties.sasl.mechanism=PLAIN
#spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
#=============== consumer  =======================
kafka.consumer.topic = test
# 指定默认消费者组
#group id
spring.kafka.consumer.group-id=test-consumer-group
#offset偏移量规则设置:
#(1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#(2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#(3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.配置消费者

@Configuration
@Data
public class PropsConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String broker;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
}
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Autowired
    PropsConfig propsConfig;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
//        propsMap.put("security.protocol""SASL_PLAINTEXT");
//        propsMap.put("sasl.mechanism""PLAIN");
        return propsMap;
    }

    @Bean
    public MyListener listener() {
        return new MyListener();
    }
}

这里整合了两种形式,一种不指定消费者和分区,一种指定消费者和分区

public class MyListener {
    private static final Logger log = LoggerFactory.getLogger(MyListener.class);
    private static final String TPOIC = "test";

    // 监听多个topic
    @KafkaListener(topics = {"#{'${kafka.consumer.topic}'.split(',')}"})
    public void listenPartition(List<ConsumerRecord<?, ?>> records) {
        log.info("Id Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                int partition = record.partition();
                log.info("topic {} partition {} Received message={}",  topic,partition,message);
            }
        }
    }

//    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("topic {} p0 Received message={}",  topic,message);
            }
        }
    }

//    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("topic {} p1 Received message={}",  topic,message);
            }
        }
    }

//    @KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "2" }) })
    public void listenPartition2(List<ConsumerRecord<?, ?>> records) {
        log.info("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id2 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("topic {} p2 Received message={}", topic, message);
            }
        }
    }
}

4.启动项目,查看消费者组信息

1.容器内查看Consumer Group详情
  kafka-consumer-groups.sh --bootstrap-server 192.168.5.128:9092,192.168.5.128:9093,192.168.5.128:9094 --describe  --group test-consumer-group

2.当消费者里配置setConcurrency=2时,可以看到有消费者消费两个分区
  GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
  test-consumer-group test            0          0               0               0               consumer-1-a22a56ed-7324-44f9-9a60-e47c21e4b0b2 /192.168.5.1    consumer-1
  test-consumer-group test            1          0               0               0               consumer-1-a22a56ed-7324-44f9-9a60-e47c21e4b0b2 /192.168.5.1    consumer-1
  test-consumer-group test            2          1               1               0               consumer-2-bad2eb15-030b-4dcd-a6d8-1140bcccc90e /192.168.5.1    consumer-2

3.当消费者里配置setConcurrency=3时,可以看到消费者平均消费
  GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
  test-consumer-group test            0          0               0               0               consumer-1-b49eb667-7248-4e76-a600-635e83dc868e /192.168.5.1    consumer-1
  test-consumer-group test            2          1               1               0               consumer-3-2c8f0eb4-cf0f-4854-965a-82ee97fc1a1e /192.168.5.1    consumer-3
  test-consumer-group test            1          0               0               0               consumer-2-0bb8f9eb-5b29-48ce-bbf6-c8812a7864b0 /192.168.5.1    consumer-2

注意:在配置分区和Consumer数量时,一定要设置分区数 >= Consumer数,让一个Consumer至少读取一个分区,不能存在空闲的Consumer,否则就达不到提高吞吐量的目的

号外!号外!

如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!

SpringBoot集成Kafka - 用Multi-Consumer实现数据高吞吐

如何监听 MySQL 实现数据变化后的实时通知


实时监听MySQL同步数据到ElasticSearch


面试官:MQ 消息丢失、重复、积压问题,如何解决?


SpringBoot集成Kafka - 用Multi-Consumer实现数据高吞吐


原文始发于微信公众号(一安未来):SpringBoot集成Kafka – 用Multi-Consumer实现数据高吞吐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44901.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!