SpringBoot整合kafka实现批量消费功能

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。SpringBoot整合kafka实现批量消费功能,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

在springboot中通过简单配置可以实现对kafka数据的批量拉取和消费功能,首先需要修改yarn配置文件,我测试服务的配置信息如下:

spring:
  kafka:
    bootstrap-servers: 192.168.1.101:9090,192.168.1.102:9090,192.168.1.103:9090
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: batch_consumer_group
      auto-offset-reset: earliest
      max-poll-records: 10240
      max-poll-interval-ms: 10000
      enable-auto-commit: false
      auto-commit-interval: 5000
      fetch-min-size: 1024000
      fetch-max-wait: 30000

在项目的主启动类上需要排除kafka自动配置功能:

@SpringBootApplication(exclude = { KafkaAutoConfiguration.class })
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}

创建一个kafka消费者配置类,指定配置信息:

import com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.Map;

/**
 * @author xingo
 * @date: 2022/1/7
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;
	@Value("${spring.kafka.consumer.enable-auto-commit}")
	private Boolean enableAutoCommit;
	@Value("${spring.kafka.consumer.auto-commit-interval}")
	private Integer autoCommitInterval;
	@Value("${spring.kafka.consumer.group-id}")
	private String groupId;
	@Value("${spring.kafka.consumer.max-poll-records}")
	private Integer maxPollRecords;
	@Value("${spring.kafka.consumer.max-poll-interval-ms}")
	private Integer maxPollIntervalMs;
	@Value("${spring.kafka.consumer.auto-offset-reset}")
	private String autoOffsetReset;
	@Value("${spring.kafka.consumer.key-deserializer}")
	private String keyDeserializer;
	@Value("${spring.kafka.consumer.value-deserializer}")
	private String valueDeserializer;
	@Value("${spring.kafka.consumer.fetch-min-size}")
	private Integer fetchMinSize;
	@Value("${spring.kafka.consumer.fetch-max-wait}")
	private Integer fetchMaxWait;

	/**
	 *  消费者配置信息
	 */
	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = Maps.newHashMap();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
		props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
		props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinSize);
		props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxWait);
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
		props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);

		return props;
	}

	/**
	 *  消费者批量工程
	 */
	@Bean
	public KafkaListenerContainerFactory<?> batchConsumeFactory() {
		ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
  		// 并发创建消费者数量
        // factory.setConcurrency(5);
		// 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
		factory.setBatchListener(true);
		factory.getContainerProperties().setPollTimeout(15000);

		return factory;
	}
}

最后创建一个消费者监听类就可以实现批量拉取数据了:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class KafkaConsumer {

	/**
	 * 批量消费数据
	 * @param records
	 */
	@KafkaListener(topics = "batch-messages", containerFactory = "batchConsumeFactory")
	public void linsten(List<ConsumerRecord<Long, String>> records) {
		try {
			System.out.println("批量拉取消息队列数据 : " + records.size());
			if (records.size() > 0) {
				for (ConsumerRecord<Long, String> record : records) {
					System.out.println(record.value());
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181850.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!