快速全方位掌握kafka

kafka是一种高吞吐量(几万、几十上百万)的分布式基于发布/订阅的消息队列。相比较其他消息中间件(RocketMQ)的特点是吞吐量大,可以忍受少量消息丢失,所以在大数据实时处理领域得到广泛使用。

kafka架构

主要由Broker、Producer、Consumer、Consumer Group、Topic、Partition构成。

快速全方位掌握kafka

kafka核心概念

  • Broker:是kafka的服务节点,即kafka的服务器。
  • Topic: kafka中的消息以Topic为单位进行划分,生产者将消息发送到特定的Topic,而消费者负责订阅Topic的消息并进行消费。
  • Partition:Topic物理上分组,它可以分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个追加的日志文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量。
  • Segment:partition物理上由多个segment组成,每个segment存着message信息。
  • Offset:offset是消息在分区中的唯一标识,kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,kafka保证的是分区有序性而不是主题有序性。
  • Replica:保证数据高可用的方式,kafka同一partition的数据可以在多Broker存在多个副本,通常只有主副本对外提供读写服务,当主副本所在broker宕机,kafka会在controller的管理下重新选择新的Leader副本对外提供读写服务。
  • Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其发送到kafka。
  • Consumer:消费者,也就是接受消息的一方。消费者连接到kafka上并接收消息,进而进行相应的业务逻辑处理。
  • Consumer Group:一个消费者组可以包含一个或多个消费者。使用多分区+多消费者方式可以极大提高数据下游的处理速度,同一个消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消费消息时互不影响。kafka是通过消费组的方式来实现消息P2P模式和广播模式。
  • zookeeper:负责维护和协调broker。当kafka系统中新增了broker或者某个broker发生宕机,由zk通知生产者和消费者。生产者和消费者依据zk的broker状态信息与broker协调数据的发布和订阅任务。最新版本kafka已不需要zk。
  • Leader:每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
  • Follower:Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与leader保持数据同步。如果leader失效,则从follower中选举出一个新的leader。当follower与leader同步太慢,leader会把这个从ISR删除,重新建一个Follower。

kafka安装模式

  • helm chart安装,分amd/arm镜像,推荐使用3 broker方式。
    • 添加kafka源:helm repo add bitnami https://charts.bitnami.com/bitnami
    • 更新kafka源:helm repo update
    • 下载kafka helm包:helm pull bitnami/kafka; 解压kafka-12.20.0.tgz文件,tar zxvf  kafka-12.20.0.tgz
    • 修改values.yaml中副本为3, helm install kafka-test ./kafka
    • 由于arm镜像官方没有推出,需要自己手动构建,可参考:https://github.com/bitnami/bitnami-docker-kafka

kafka后台操作基本指令

  • 本文指令使用broker连接信息,不采用zk。进入/opt/bitnami/kafka/bin目录下有对应指令
  • 创建主题:./kafka-topics.sh –create –bootstrap-server ip1:9092,ip2:9092,ip3:9092 –replication-factor 2 –partitions 3 –topic test
  • 查看主题:./kafka-topics.sh –list –bootstrap-server ip1:9092,ip2:9092,ip3:9092
  • 主题详情:./kafka-topics.sh –describe –bootstrap-server ip1:9092,ip2:9092,ip3:9092 –topic test
  • 增加topic分区数:./kafka-topics.sh –bootstrap-server ip1:9092,ip2:9092,ip3:9092 –alter –topic topicName –partitions 8
  • 删除topic,需要在server.properties中配置delete.topic.enable=true;./kafka-topics.sh –bootstrap-server ip1:9092,ip2:9092,ip3:9092 –delete –topic test
  • 查看topic某分区偏移量:./kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list ip1:9092,ip2:9092,ip3:9092 –topic test
  • 生产者生产数据:./kafka-console-producer.sh –broker-list ip1:9092,ip2:9092,ip3:9092 –topic test
  • 消费者消费数据:./kafka-console-consumer.sh –from-beginning –topic test –bootstrap-server ip1:9092,ip2:9092,ip3:9092

Go对应的kafka api接口

由于kafka官方没有支持go client,本文主要使用sarama包下的go client。

  • 操作kafka对应的client:client, err := sarama.NewClient(brokerList, config)

    • brokerList为kafka中broker地址,用逗号隔开;config为对应的配置文件,sarama.NewConfig()设置。
  • 管理员client:clusterAdmin, err := sarama.NewClusterAdminFromClient(client)

  • 消费者client: consumerClient, err := sarama.NewConsumerFromClient(clinet)

  • 消费组client:consumerGroupClient, err := sarama.NewConsumerGroupFromClient(groupID,clinet)

  • 生产者client:producerClient, err := sarama.NewSyncProducerFromClient(client)

  • 偏移量管理client:offsetClient, err := sarama.NewOffsetManagerFromClient(group,client)

  • 创建topic:clusterAdmin.CreateTopic(topicName, topicDetail, false)

  • topic详情:sarama.TopicMetadata, err := clusterAdmin.DescribeTopics([]string)

  • 列出topic:topicDetails, err := clusterAdmin.ListTopics()

  • 删除topic:clusterAdmin.DeleteTopic(topicName)

  • 列出某Topic对应的消费组

    • 找到消费组:groupMap, err := clusterAdmin.ListConsumerGroup()
    • 消费组详情:descriptions, err := clusterAdmin.DescribeConsumerGroups(groups)
    • 根据topic过滤过去GroupID
  • 根据topicName列出partition:

    • 获取partions的:ids, err := client.Partions(topicName)
    • 获取某topic某partition下的偏移量offSet,此处为下一个偏移量的值:maxOffset, err := client.GetOffset(topicName,partitionID,sarama.OffsetNewest)
  • 某topic、某partion下,发送消息:pid, offSet, err := producerClient.SendMessage(msg *ProducerMessage)

  • 按offset查询消息:saramaMessages, err := consumerClient.ConsumePartition(topic, partition, offset)

  • 按时间查询消息,先通过时间获取对应的offSet:startOffset, err := client.GetOffset(topic,partition,startTime)

    • 获取消息:messages, err := consumerClient.ConsumePartition(topic,partition,startOffset)
  • 创建消费组查询消息:saramaMsgs, err := consumerGroupClient.Consume(ctx,topics,&consumer)

  • 获取某topic某partition某offset的消费消息;当消息已消费,此处报错

    • messages, err := sarama.Consumer.ConsumePartition(topic, partition, offset)
  • 更加topicName和PartitionID获取偏移量:nextOffset, _, err := offsetClient.GetPartionOffset(topicName,PartitionID)

    • 重置offset:partitionManager, err := offsetClient.ManagePartition(topic,partition); partitionManage.ResetOffset(offset,metadata)

kafka数据索引机制

  • topic在物理层面以partition为分组,一个topic可以分成若干个partition。partition可以细分为segment,一个partiton物理上由多个segment组成。
  • Logsegment文件由“.index”文件和“.log”组成,分别为索引文件和数据文件。
    • partition全局的第一个segment从0开始,后续每个segment文件名最后一条消息的offset值。
    • 数值大小为64位,20位数字字符长度,没有数字用0填充。
    • 第一个segment:00000000000000000000.index和00000000000000000000.log
    • 第二个segment,为最后一条offset组成:00000000000000170410.index。
  • 索引文件以稀疏索引的方式构造消息的索引。
  • 偏移量索引和时间戳索引根据二分查找法来定位。
  • 检索查询只是Kafka的一个辅助功能,不需要为了这个功能而去花费特别太的代价去维护一个高level的索引。
快速全方位掌握kafka

如何通过 offset 找到 某一条消息呢?

  • 首先会根据 offset 值去查找 Segment 中的 index 文件,因为 index 文件是以上个文件的最大 offset 偏移命名的所以可以通过二分法快速定位到索引文件。
  • 找到索引文件后,索引文件中保存的是 offset 和对应的消息行在 log 日志中的存储行号,因为 Kafka 采用稀疏矩阵的方式来存储索引信息,并不是每一条索引都存储,所以这里只是查到文件中符合当前 offset 范围的索引。
  • 拿到 当前查到的范围索引对应的行号之后再去对应的 log 文件中从 当前 Position 位置开始查找 offset 对应的消息,直到找到该 offset 为止。

参考

https://kafka.apachecn.org/documentation.html

原文始发于微信公众号(云原生内经):快速全方位掌握kafka

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/167930.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!