在springboot中通过简单配置可以实现对kafka数据的批量拉取和消费功能,首先需要修改yarn配置文件,我测试服务的配置信息如下:
spring:
kafka:
bootstrap-servers: 192.168.1.101:9090,192.168.1.102:9090,192.168.1.103:9090
consumer:
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: batch_consumer_group
auto-offset-reset: earliest
max-poll-records: 10240
max-poll-interval-ms: 10000
enable-auto-commit: false
auto-commit-interval: 5000
fetch-min-size: 1024000
fetch-max-wait: 30000
在项目的主启动类上需要排除kafka自动配置功能:
@SpringBootApplication(exclude = { KafkaAutoConfiguration.class })
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
创建一个kafka消费者配置类,指定配置信息:
import com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.Map;
/**
* @author xingo
* @date: 2022/1/7
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean enableAutoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.max-poll-interval-ms}")
private Integer maxPollIntervalMs;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Value("${spring.kafka.consumer.fetch-min-size}")
private Integer fetchMinSize;
@Value("${spring.kafka.consumer.fetch-max-wait}")
private Integer fetchMaxWait;
/**
* 消费者配置信息
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinSize);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxWait);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
return props;
}
/**
* 消费者批量工程
*/
@Bean
public KafkaListenerContainerFactory<?> batchConsumeFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
// 并发创建消费者数量
// factory.setConcurrency(5);
// 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(15000);
return factory;
}
}
最后创建一个消费者监听类就可以实现批量拉取数据了:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class KafkaConsumer {
/**
* 批量消费数据
* @param records
*/
@KafkaListener(topics = "batch-messages", containerFactory = "batchConsumeFactory")
public void linsten(List<ConsumerRecord<Long, String>> records) {
try {
System.out.println("批量拉取消息队列数据 : " + records.size());
if (records.size() > 0) {
for (ConsumerRecord<Long, String> record : records) {
System.out.println(record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181850.html