大家好,我是一安,之前一直介绍RabbitMQ,今天介绍一下实际开发中SpringBoot如何集成kafka实现高吞吐
简介
Apache Kafka是一个分布式发布 – 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。
Kafka适合离线和在线消息消费, Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。
Kafka构建在ZooKeeper同步服务之上,它与Apache Storm Spark非常好地集成,用于实时流式数据分析。
Kafka依赖于日志顺序写, 因此支持消息回溯和支撑高性能读写
基础概念
-
Produce:消息生产者,向消息系统发送消息的客户端 -
Consumer:消息消费者,从消息系统读取消息的客户端 -
Broker:服务代理节点(集群),用以处理消息。对于Kafka而言,Broker可以简单地看作一个独立的Kafka实例节点,Kafka集群由多个服务代理节点broker组成 -
Topic:主题,是一个逻辑概念,其作用类似于关系型数据库的表 -
Partition:分区,为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic分为多个partition(分布在不同的服务器上,具体在.log文件里),序号从0开始,每个partition是一个有序队列。partition中每条消息都会被分配一个有序的ID(offset)。Kafka只保证一个partition中的消息的有序性,不保证一个topic的多个partition间的顺序; -
Offset:偏移量,partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息 -
Consumer group:消费者群组,一个topic可以有多个consumer,每个partition只会把消息发给消费者组中的一个consumer -
Leader & Follower:Leader和Follower是针对于Partition提出的概念,Consumer和Producer的读写操作都是基于Leader分区的,其他Follwer提供对Leader的主从复制同步,用以故障切换。Leader和Follower可以存在于不同的broker中
场景
-
低吞吐量:在默认情况,一个Topic只有一个Partition。Producer(s)将消息添加到该Partition的末尾,一个Consumer从该Partition中依序读走消息去处理
-
高吞吐量:
-
为了加快写入消息的速度,需要将一个Topic分成多个分区,Topic的分区是有状态的,增加分区后不可减少 -
为了加快读取消息的速度,需要有多个Consumer同时来从分区中读取消息 -
用Consumer Group来保证一个分区只能被一个Consumer读取
在一个Consumer Group内,一个Consumer可以读取多个分区,但是一个分区只能被一个Consumer读取
演示
环境搭建
小编为了方便就简单搭建一个集群环境,如果想了解详细搭建过程,后面小编会专门做一篇kafka的集群搭建过程,包括实现SASL认证
1.通过镜像创建容器并运行zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
2.通过镜像创建容器并运行kafka0,kafka1,kafka2
docker run -d --name kafka0 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.5.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.128:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
docker run -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.5.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.128:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka
docker run -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.5.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.128:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka
3.进入容器容器创建主题
docker exec -it kafka0 /bin/bash
cd /opt/kafka_2.13-2.8.1/bin
kafka-topics.sh --create --zookeeper 192.168.5.128:2181 --replication-factor 1 --partitions 3 --topic test
4.查看新创建的topic信息
kafka-topics.sh --describe --zookeeper 192.168.5.128:2181 --topic test
Topic: test TopicId: K8xJNqDjQfSVHPreTVNKNQ PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 1 Replicas: 1 Isr: 1
代码示例
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
2.配置信息
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.5.128:9092,192.168.5.128:9093,192.168.5.128:9094
#=============== producer =======================
server.port=8090
spring.kafka.producers.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.properties.sasl.mechanism=PLAIN
#spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
#=============== consumer =======================
kafka.consumer.topic = test
# 指定默认消费者组
#group id
spring.kafka.consumer.group-id=test-consumer-group
#offset偏移量规则设置:
#(1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#(2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#(3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3.配置消费者
@Configuration
@Data
public class PropsConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String broker;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
}
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Autowired
PropsConfig propsConfig;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
// propsMap.put("security.protocol", "SASL_PLAINTEXT");
// propsMap.put("sasl.mechanism", "PLAIN");
return propsMap;
}
@Bean
public MyListener listener() {
return new MyListener();
}
}
这里整合了两种形式,一种不指定消费者和分区,一种指定消费者和分区
public class MyListener {
private static final Logger log = LoggerFactory.getLogger(MyListener.class);
private static final String TPOIC = "test";
// 监听多个topic
@KafkaListener(topics = {"#{'${kafka.consumer.topic}'.split(',')}"})
public void listenPartition(List<ConsumerRecord<?, ?>> records) {
log.info("Id Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
int partition = record.partition();
log.info("topic {} partition {} Received message={}", topic,partition,message);
}
}
}
// @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id0 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("topic {} p0 Received message={}", topic,message);
}
}
}
// @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id1 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("topic {} p1 Received message={}", topic,message);
}
}
}
// @KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "2" }) })
public void listenPartition2(List<ConsumerRecord<?, ?>> records) {
log.info("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id2 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("topic {} p2 Received message={}", topic, message);
}
}
}
}
4.启动项目,查看消费者组信息
1.容器内查看Consumer Group详情
kafka-consumer-groups.sh --bootstrap-server 192.168.5.128:9092,192.168.5.128:9093,192.168.5.128:9094 --describe --group test-consumer-group
2.当消费者里配置setConcurrency=2时,可以看到有消费者消费两个分区
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-consumer-group test 0 0 0 0 consumer-1-a22a56ed-7324-44f9-9a60-e47c21e4b0b2 /192.168.5.1 consumer-1
test-consumer-group test 1 0 0 0 consumer-1-a22a56ed-7324-44f9-9a60-e47c21e4b0b2 /192.168.5.1 consumer-1
test-consumer-group test 2 1 1 0 consumer-2-bad2eb15-030b-4dcd-a6d8-1140bcccc90e /192.168.5.1 consumer-2
3.当消费者里配置setConcurrency=3时,可以看到消费者平均消费
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-consumer-group test 0 0 0 0 consumer-1-b49eb667-7248-4e76-a600-635e83dc868e /192.168.5.1 consumer-1
test-consumer-group test 2 1 1 0 consumer-3-2c8f0eb4-cf0f-4854-965a-82ee97fc1a1e /192.168.5.1 consumer-3
test-consumer-group test 1 0 0 0 consumer-2-0bb8f9eb-5b29-48ce-bbf6-c8812a7864b0 /192.168.5.1 consumer-2
注意:在配置分区和Consumer数量时,一定要设置分区数 >= Consumer数,让一个Consumer至少读取一个分区,不能存在空闲的Consumer,否则就达不到提高吞吐量的目的
号外!号外!
如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!
原文始发于微信公众号(一安未来):SpringBoot集成Kafka – 用Multi-Consumer实现数据高吞吐
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44901.html