关于Kafka常见的一些问题

导读:本篇文章讲解 关于Kafka常见的一些问题,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1. kafka为什么要与zookeeper一起使用?

Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到Zk上的,否则Zk就疯了。kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition的leader建立socket连接并发送消息。也就是说每个Topic的partition是由Lead角色的Broker端使用zookeeper来注册broker信息,以及监测partition leader存活性.Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。

2. kafka中最重要的组件是哪些?

主题(topic) 、分区(partition)、生产者(Producers)、消费者(Consumers)、经纪人(Brokers)

3. 什么是消费者组 为什么有什么用?

consumer
group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。个人认为,理解consumer group记住下面这三个特性就好了:
(1)consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程;
(2)group.id是一个字符串,唯一标识一个consumer group;
(3)consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)。

4. zookeeper在kafka中起什么作用?

(1)leader 选举 和 follower 信息同步
(2)Broker注册
(3)Topic注册
(4)生产者的负载均衡
(5)消费者的负载均衡
(6)记录分区与消费者的关系

5. 没有zookeeper kafka可以运行吗?

不行,kafka的设计中就依赖于zookeeper

6. kafka编程是最重要的api有几个?

Producer的API、Consumer的API、Kafka Hadoop Consumer API

7. kafka中leader flower 是什么意思?

leader是领导者、flower是跟随者

8. 什么叫isr?

leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护

9. kafka启动流程是什么?

加载配置文件 -》赋值给serverProps -》启动了一个内部的监控服务 -》kafkaServerStartable.startup-》,首先判断是否目前正在关闭中或者已经启动了,这两种情况直接抛出异常。然后是一个CAS的操作isStartingUp,防止线程并发操作启动,判断是否可以启动。如果可以启动,就开始我们的启动过程。
• 构造Metrics类 • 定义broker状态为启动中starting • 启动定时器kafkaScheduler.startup()
• 构造zkUtils:利用参数中的zk信息,启动一个zk客户端
• 启动文件管理器:读取zk中的配置信息,包含consumer_offsets和__system.topic。重点是启动一些定时任务,来删除符合条件的记录(cleanupLogs),清理脏记录(flushDirtyLogs),把所有记录写到一个文本文件中,防止在启动时重启所有的记录文件(checkpointRecoveryPointOffsets)。

首先判断是否目前正在关闭中或者已经启动了,这两种情况直接抛出异常。然后是一个CAS的操作isStartingUp,防止线程并发操作启动,判断是否可以启动。如果可以启动,就开始我们的启动过程。
• 构造Metrics类 • 定义broker状态为启动中starting • 启动定时器kafkaScheduler.startup()
• 构造zkUtils:利用参数中的zk信息,启动一个zk客户端
• 启动文件管理器:读取zk中的配置信息,包含consumer_offsets和__system.topic。重点是启动一些定时任务,来删除符合条件的记录(cleanupLogs),清理脏记录(flushDirtyLogs),把所有记录写到一个文本文件中,防止在启动时重启所有的记录文件(checkpointRecoveryPointOffsets)。

10. kafka 接到的信息最大多大 是否可以更改?

Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
针对这个问题,有以下几个建议: • 最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。
• 第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
• 第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。
不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数: broker 配置:
message.max.bytes (默认:1000000) –
broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。
• log.segment.bytes (默认: 1GB) –
kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
• replica.fetch.max.bytes (默认: 1MB) –
broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
consumer 配置: fetch.message.max.bytes (默认 1MB) –
消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。
• 性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。
• 可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的 fetch.message.max.bytes 指定了最大消息需要的内存空间,同样,分区数*最大需要内存空间,不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。
• 垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。
一切的一切,都需要在权衡利弊之后,再决定选用哪个最合适的方案。
参考配置如下: 1 2 3 4
replica.fetch.max.bytes=4194304
message.max.bytes=4000000
compression.codec=snappy
max.partition.fetch.bytes=4194304

11. kafka的cluster 是什么意思

kafka的集群

12. kafka怎么调优

Broker参数配置 1、网络和io操作线程配置优化
#broker处理消息的最大线程数(默认为3) num.network.threads=cpu核数+1
#broker处理磁盘IO的线程数 num.io.threads=cpu核数*2

2、log数据文件刷盘策略
#每当producer写入10000条消息时,刷数据到磁盘 log.flush.interval.messages=10000
#每间隔1秒钟时间,刷数据到磁盘 log.flush.interval.ms=1000

3、日志保留策略配置
#保留三天,也可以更短 (log.cleaner.delete.retention.ms) log.retention.hours=72
#段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件
log.segment.bytes=1073741824

4、Replica相关配置 default.replication.factor:3
#这个参数指新创建一个topic时,默认的Replica数量,Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。
Java API调优 1、zookeeper.session.timeout.ms
解释:配置的超时时间太短,Zookeeper没有读完Consumer的数据,连接就被Consumer断开了! 参数:5000

2、zookeeper.sync.time.ms 解释:ZooKeeper集群中leader和follower之间的同步的时间
参数:2000

3、auto.commit.enable=true
解释:注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交

4、auto.commit.interval.ms 解释:自动提交offset到zookeeper的时间间隔 参数:1000

5、zookeeper.connection.timeout.ms 解释:确认zookeeper连接建立操作客户端能等待的最长时间
参数:10000

6、rebalance.backoff.ms 解释:消费均衡两次重试之间的时间间隔 参数:2000

7、rebalance.max.retries 解释:消费均衡的重试次数 参数:10

13. kafka的缺点

重复消息:kafka保证每条消息至少送达一次,虽然几率很小,但一条消息可能被送达多次
消息乱序:kafka某一次固定的partition内部的消息是保证是序的,如果是个Topic有多个Partition,Partition之间的消息送达不保证有序
复杂性:Kafka需要Zookeeper的支持,Topic一般人工创建,部署和维护比一般的MQ成本高

本篇博客记录了我在学习Kafka过程中的一些问题,可能有些问题总结不到位,望见谅,有关Kafka基础面试题也可浏览这篇博客:kafka基础面试题

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13713.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!