记一次Kakfa消费者集群的生产事故

导读:本篇文章讲解 记一次Kakfa消费者集群的生产事故,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

由于项目业务需要,在之前的Kafka Consumer集群的自动提交设置为手动提交,先介绍一下怎么设置为手动提交。

设置手动提交

consumer

@Component
public class KafkaConsumer {

    @Autowired
    private CustomConfig customConfig;

    @Autowired
    private StatsDClient statsDClient;

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaBootstrapServer;

    @Value("${spring.kafka.sfc-bootstrap-servers}")
    private String kafkaSfcServer;

    @Value("${spring.kafka.consumer.group-id}")
    private String kafkaGroupId;

    @Autowired
    private GPSKafkaConfig gpsKafkaConfig;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitMs;


    @Value("${spring.kafka.listener.ack-count}")
    private Integer ackCount;

    @Value("${spring.kafka.listener.ack-time}")
    private Integer ackTime;



    private Map<String, Object> initProps() {
        Map<String, Object> props = new HashMap<>();
        
        // kafka的ip地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitMs);

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, gpsKafkaConfig.getSessionTimeoutMs());
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, gpsKafkaConfig.getMaxPollIntervalMs());

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, gpsKafkaConfig.getMaxPollRecords());
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, gpsKafkaConfig.getFetchMinBytes());
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, gpsKafkaConfig.getFetchMaxWaitMs());

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);

        return props;
    }


    @Bean("customContainerFactory")
    public ConcurrentKafkaListenerContainerFactory customContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(initProps()));
        factory.setConcurrency(gpsKafkaConfig.getGpsConcurrency());
        factory.setBatchListener(true);


        // 改为手动提交必须设置的参数,否则会报错

        // 设置手动提交的模式
        //RECORD:每处理一条commit一次
        //BATCH(默认):每次poll的时候批量提交一次,频率取决于每次poll的调用频率
        //TIME:每次间隔ackTime的时间去commit
        //COUNT:累积达到ackCount次的ack去commit
        //COUNT_TIME:ackTime或ackCount哪个条件先满足,就commit
        //MANUAL:listener负责ack,但是背后也是批量上去
        //MANUAL_IMMEDIATE:listner负责ack,每调用一次,就立即commit  

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        // 当ackMode为“COUNT”或“COUNT_TIME”时,偏移提交之间的记录数
        factory.getContainerProperties().setAckCount(ackCount);
        // 当ackMode为“TIME”或“COUNT_TIME”时,偏移提交之间的时间(以毫秒为单位)
        factory.getContainerProperties().setAckTime(ackTime);
        
        return factory;
    }


    @KafkaListener(topics = {"消费的topic名称"}, containerFactory = "customContainerFactory")
    public void orderMessage(String message,Acknowledgment ack) {
        
        // 你的业务

        // 执行结束,手动提交
        ack.acknowledge();
    }

}

Config

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;


@ConfigurationProperties(prefix = "spring.kafka.custom")
@Configuration
@Data
public class GPSKafkaConfig {
    /**
     * kafka消费线程数
     */
    private Integer gpsConcurrency;
    /**
     * 允许自动提交
     */
    private Boolean enableAutoCommit;
    /**
     * 自动提交时间
     */
    private Integer autoCommitIntervalMs;
    /**
     * session.timeout.ms
     */
    private Integer sessionTimeoutMs;
    /**
     * max.poll.interval.ms
     */
    private Integer maxPollIntervalMs;
    /**
     * 返回的最大record数
     */
    private Integer maxPollRecords;

    /**
     * 一个parttion拉取的最小字节数
     */
    private Integer fetchMinBytes;
    /**
     * 拉取数据的时候最长等待时间,与fetch.min.bytes配合使用
     */
    private Integer fetchMaxWaitMs;

    private Integer abandonPercent;

    /**
     * 顺风车丢弃gps消息比例
     */
    private Integer sfcAbandonPercent;
}

YML 

  kafka:
    listener:
      #当enable-auto-commit为false时生效
      ack-mode: manual
      #当ackMode为“COUNT”或“COUNT_TIME”时,偏移提交之间的记录数
      ack-count: 10
      #当ackMode为“TIME”或“COUNT_TIME”时,偏移提交之间的时间(以毫秒为单位)
      ack-time: 10000
      concurrency: 50
    bootstrap-servers: xx.xx.xx.xx:9092
    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100
      group-id: group1
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    custom:
      gpsConcurrency: 120
      enableAutoCommit: false
      autoCommitIntervalMs: 100
      sessionTimeoutMs: 300000
      maxPollIntervalMs: 500000
      maxPollRecords: 50
      fetchMinBytes: 51200
      fetchMaxWaitMs: 2000
      abandonPercent: 10

消费者集群改为手动提交引发的问题

1、如果不是单台机器,是集群的,那么会有分片问题

commit failed 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
        at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死了,那么为什么呢?

问题分析

这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms
该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

分析原因

如下是我们消费者处理逻辑(省略部分代码)

 while (isRunning) {
            ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
            if (records != null && records.count() > 0) {
           
            for (ConsumerRecord<KEY, VALUE> record : records) {
                dealMessage(bizConsumer, record.value());
                try {
                    //records记录全部完成后,才提交
                      consumer.commitSync();
                } catch (CommitFailedException e) {
                      logger.error("commit failed,will break this for loop", e);
                        break;
                }
            }
}

poll()方法该方法轮询返回消息集,调用一次可以获取一批消息。

kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用消费者的轮询方法。每次轮询到消息,在处理完这一批消息后,才会继续下一次轮询。但如果一次轮询返回的结构没办法及时处理完成,会有什么后果呢?服务端约定了和客户端max.poll.interval.ms,两次poll最大间隔。如果客户端处理一批消息花费的时间超过了这个限制时间,服务端可能就会把消费者客户端移除掉,并触发rebalance

拉取偏移量与提交偏移量

kafka的偏移量(offset)是由消费者进行管理的,偏移量有两种,拉取偏移量(position)与提交偏移量(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka会使用拉取偏移量的值作为分区的提交偏移量发送给协调者。
如果没有提交偏移量,下一次消费者重新与broker连接后,会从当前消费者group已提交到broker的偏移量处开始消费。
所以,问题就在这里,当我们处理消息时间太长时,已经被broker剔除,提交偏移量又会报错。所以拉取偏移量没有提交到broker,分区又rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。

解决方案

1.增加max.poll.interval.ms处理时长

kafka消费者 默认此间隔时长为300s,本次故障是300s都没处理完成,于是改成500s。

max.poll.interval.ms=500000

2.设置分区拉取阈值

kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用轮询方法poll()。每次轮询后,在处理完这一批消息后,才会继续下一次的轮询。

max.poll.records = 50

3.poll到的消息,处理完一条就提交一条,当出现提交失败时,马上跳出循环,这时候kafka就会进行rebalance,下一次会继续从当前offset进行消费。

 while (isRunning) {
            ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
            if (records != null && records.count() > 0) {
           
            for (ConsumerRecord<KEY, VALUE> record : records) {
                dealMessage(bizConsumer, record.value());
                try {
                    //records记录全部完成后,才提交
                      consumer.commitSync();
                } catch (CommitFailedException e) {
                      logger.error("commit failed,will break this for loop", e);
                        break;
                }
            }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/71384.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!