vivo 移动互联网为全球 4 亿 + 智能手机用户提供互联网产品与服务。其中,vivo 分布式消息中间件团队主要为 vivo 所有内外销实时计算业务提供高吞吐、低延时的数据接入、消息队列等服务,覆盖应用商店、短视频、广告等业务。业务集群已达每天十万亿级的数据规模。
图 1. vivo 分布式消息中间件系统架构
上图为系统的整体架构,其中数据接入层包括数据接入、数据采集服务,支持 SDK 直连;消息中间件由 Kafka 和 Pulsar 共同承担,其中 Pulsar 的承载量达到千亿级别;数据处理部分使用 Flink、Spark 等组件。
目前,Kafka 采用多集群方式,根据不同的业务量级、重要性分别使用不同的集群提供服务,比如计费集群、搜索集群、日志集群。在 Kafka 集群的内部,则采用物理隔离的方式,根据不同业务的重要性,将不同业务的 Topic 控制在不同的资源组内,避免业务之间相互影响。
图 2. Kafka 集群资源隔离
图 3. Kafka 集群流量均衡
资源组内部则会针对 Topic 流量、分区分布、磁盘容量、机器机架等指标生成迁移计划进行流量均衡,以此增强 Kafka 可靠性。目前 Kafka 已在多集群部署、资源隔离、流量均衡三个方面保障了基本的稳定性和资源利用率,但是在此之外,系统仍存在一些问题。
过去几年来,Kafka 集群承载的业务量迅速增长,流量上涨数十倍,带来诸多问题:
-
Topic 及 Topic 分区总量不断增加,集群性能受到影响:Kafka 高性能依赖于磁盘的顺序读写,磁盘上大量分区导致随机读写加重;
-
业务流量增加迅速,存量集群变大,需要将老的业务进行资源组隔离迁移或者集群拆分。无论是资源组隔离还是集群的隔离的方式,由于集群不可以进行动态扩缩容,机器不能够进行灵活调配,都存在利用率不高、运维成本增加的问题;
-
机器扩容慢,需要做长时间流量均衡,难以应对突发流量。集群规模越大,问题越突出;
-
消费端性能扩展太依赖分区扩容,导致集群元数据疯狂增长;
-
集群数量对应的机器基数大,硬件故障概率高,出现硬件故障时影响会直接传导到客户端,缺少中间层容错。
面对庞大的集群、流量和多样化的业务场景,综合考虑集群的稳定性和维护成本等因素,vivo 需要一个功能更丰富、适用更多场景、扩展能力更强的消息组件。
Pulsar 如何解决 vivo 存在的问题,可以首先看一下 Pulsar 的架构设计。Pulsar 采用计算存储层分离架构。计算层的 Broker 节点是对等且无状态的,可以快速扩展;存储层使用 BookKeeper 作为节点,同样节点对等。这种分离架构支持计算和存储层各自独立扩展。
图 4. Pulsar 存储计算分离
其次,Pulsar 的各个节点都是轻量化的,在出现故障和宕机时可以快速恢复。一般情况下可以通过快速上下线来解决某个节点机器的问题。同时 Broker 层可以作为 BookKeeper 层的容错层,可以防止故障直接传导至用户端。
Pulsar 扩容时无需长时间的数据迁移,且支持实时均衡。Broker 层抽象了 Bundle 概念,可以用有限的 Bundle 映射海量 Topic,Topic 可以随着 Bundle 迁移,通过动态迁移 Bundle 可以更好地应对流量突发场景。BookKeeper 分层分片的架构让数据分布均匀,在 Broker 层有一个选择机制,在扩容时可以将数据写入存储量小的节点,扩容时无需数据迁移,提供更好的流量高峰应对能力。Bookie 进行数据刷盘时会对批量数据自动进行数据排序,可以避免 Kafka 中的随机读写。
Pulsar 提供了四种消息模型:Exclusive、Failover、Shared 和 Key_Shared,其中 Shared 模型允许一个分区同时被多个消费实例订阅消费,并采用 Round Robin(轮询)方式将数据推送到各个消费实例。因此消费能力的扩展不会过于依赖分区扩容,慢消费的消费实例也可以在 Shared 模型中得到解决。Key_Shared 模型则是在 Shared 的基础上对应对顺序性有要求的场景,可以按照 Key 来消费。
图 5. Pulsar 订阅模型
Pulsar 的设计架构带来了海量分区支撑、消费扩展、精准限流、流量均衡、快速扩缩容、故障恢复、分层存储、云原生容器部署、异地多活等特性和优势,可以帮助集群更好地实现高可用、高扩展,提高了更高的弹性。
下面我们从流量控制和数据管理方面,分享 vivo 在使用 Pulsar 过程中的集群管理经验。
在集群流量控制层面,比较关键的一点就是 Bundle 的管理。Bundle 负责控制用户流量到 Broker 的具体分布。Broker 与 Topic 之间没有直接联系,而是在 Broker 之上抽象出 Bundle 概念,通过 Bundle 与 Topic 建立关系;Topic 通过名称计算哈希值,并散列分布到一致性哈希环中,而哈希环的每一段都是一个 Bundle。另外 Load Manager 根据 Bundle 的负载情况将后者分配到对应的 Broker 上,将 Bundle 数据存储在 ZooKeeper 中。由此以来就间接实现了 Topic 与 Broker 之间的联系(可参考近期 StreamNative 发布的 Broker 负载均衡技术文章)。
图 6. Bundle 与 Topic 建立关系
这里需要注意:
-
Bundle 的个数影响均衡效果,因为通过一致性哈希来确认 Topic 应该落在哪个 Bundle 上, Topic 与 Bundle 会存在不均衡分配,某些 Bundle 分配的 Topic 可能较多或较少。Bundle 越多,每个 Bundle 承载的 Topic 越少,粒度越细。依赖于 Pulsar 的负载均衡算法,均衡效果更好;否则若 Bundle 太大,无论如何卸载都很难平衡负载;
-
Bundle 数据和 Broker 映射元数据都维护在 ZooKeeper 中,需要做好 Bundle 数量的规划。
针对以上两点,我们根据 Broker 来设置 Bundle 数量设置最小最大值来控制,还可以对流量较大的 Topic 针对性扩大分区,让分区均匀分配到 Broker Bundle 上。
Pulsar 虽然提供了海量分区能力,但是过多的 Topic 或者分区产生的 lookup 也会对集群产生较大的压力。集群管理者需要提前规划 Bundle 和分区设置,杜绝滥用。
另外对 Bundle 的操作需要注意:
-
Pulsar 本身提供了卸载操作,可以解除 Bundle 和 Broker 的关联关系,将 Bundle 重新分配。线上流量较大时应卸载 Bundle 而不是整个命名空间,因为卸载后者会导致其上的全部 Bundle 与对应的生产者、消费者断开,重新进行 lookup。
-
利用 Bundle split 对流量较大的 Bundle 进行拆分,增加命名空间的 Bundle 数量,降低影响。
总体而言,用户需要注意流量的均衡与集群的稳定性,在集群管理之初就做好 Bundle 的数量管理和相关测试,谨慎对待大批量 Bundle 卸载等运维操作。
接下来我们从数据的存储、过期、删除三个方面来分析。
首先介绍数据写入 ledger 的过程。每一个 Topic 分区在一段时间内只创建一个 Ledger 维护分区写入的 Entry 的数据归属。Topic 分区写入的数据以 Entry 的形式,经过 Broker 写入 Netty 线程处理队列,线程依次根据 Entry 的 Ledger Id,对 Ledger 目录数取模,写入到目标磁盘 Ledger 目录,最终以 Entry Log 和 RocksDB 的索引方式存储。需要注意,Ledger 是一个分区在一段时间内写入数据的逻辑管理单位,维护了这段数据存储的 Bookie 位置。一个 Topic 分区在一段时间内写入的数据只被一个活跃 Ledger 管理,待该 Ledger 达到翻转条件后才会关闭 Ledger 并重新计算,创建新 Ledger 继续写入。
图 7. Ledger 翻转示意
Ledger 翻转后,数据才会写入新的数据目录。在 Pulsar 中,在满足 Ledger 最小翻转时间以及以下条件之一后触发 Ledger 翻转:
-
已达到 Ledger 最大翻转时间;
-
已达到 Ledger 的最大 Entry 数量;
-
已达到 Ledger 的最大大小。
默认值:
触发ledger翻转的最小时间:
managedLedgerMinLedgerRolloverTimeMinutes=10
触发ledger翻转的最长时间:
managedLedgerMaxLedgerRolloverTimeMinutes=240
触发ledger翻转的最大entry数:
managedLedgerMaxEntriesPerLedger=50000
触发ledger翻转的最大大小:
managedLedgerMaxSizePerLedgerMbytes=2048
注意两个问题:
-
Ledger 过大:最小翻转时间是防止 Ledger 元数据过快增长的手段,但实践发现如果 Topic 分区流量较大,Ledger 的实际值可能远超上述设置的上限阈值。Ledger 只有在翻转后才会创建新的 Ledger,Ledger 过大会导致某段时间内写入某个磁盘的数据过多,产生磁盘存储不均衡的问题;针对 Ledger 为对象的一些操作也会受到影响,产生无法及时卸载数据到二级存储、数据卸载时间较长、还未卸载成功但 Ledger 已经过期等问题。
-
Ledger 间不均衡:Ledger ID 以集群维度进行递增。在分区的维度,按照 Ledger ID 对 Ledger 存储目录数进行取模的方式无法对多磁盘进行均衡写入。但保持 Ledger 间的大小一致,在一定程度上会对多磁盘目录的写入均衡有比较大的改善。
总而言之,建议根据业务消息情况适当调整 Ledger 翻转参数和有针对性地增加大流量 Topic 分区数量,可以防止 Ledger 过大、大小不均衡的问题。
数据过期主要分为四个阶段:
第一阶段:未被 Ack 的消息
-
Backlog 消息:该段数据不会被删除
第二阶段:已经 Ack 的消息
-
订阅主动 Ack 后,标记为非 backlog 消息,有多个订阅时以最慢的为准
-
TTL:若某 Topic 没有活跃订阅,超过 TTL 存活时间的消息会被主动 Ack ,本质上是移动 cursor
第三阶段:消息保留时间检查
-
Retention:对已经 Ack 的消息的保留策略,按保留周期和保留大小设置来保留消息
第四阶段:消息删除
-
Deleted:超过 Retenion 范围的消息则被删除。超过 rentention 保留周期和保留大小的消息,系统会从当前已经 ack 消息的最新位置往前检查并获取已经过期的 ledger,将其标记删除。
图 8. 消息保留时间检查与消息删除
从上述的消息阶段演化来看,Pulsar 提供了较大的消息管理空间,但也略显复杂。建议集群维护者建立简单统一的规则处理数据保留策略,如可以设置 TTL = Retention 保留周期值。
此处介绍数据的物理删除。Bookie 在处理数据写入过程时,会将同一段时间内的数据经过排序 flush 到同一个 Entry Log 文件中,将索引存放在 RocksDB 中。由于多个 Ledger 的数据可能会同时写入同一个 Entry Log,因此 Entry Log 便不能被简单直接的删除。对此 BookKeeper 会启动一个 GC(GarbageCollector) 线程进行检查和物理删除操作。
图 9. 数据物理删除流程
Entry Log 维护元数据信息( EntryLogMetadata),该元数据记录了 Ledger 列表、大小与剩余有效数据比例。
GC 清理线程在每个 gcWaitTime 时间间隔:
-
扫描 Entry Log 的元数据信息,对于已经没有有效数据的 entry log 直接进行删除。
-
判断是否满足 compaction 条件,满足 compaction 条件后 GC 线程会读取每一个 Entry 判断其是否过期,一旦过期就会丢弃,否则会将数据写入新的 Entry Log。
Compaction 分为 minorCompaction 和 majorCompaction,二者区别在于阈值。默认情况下,minorCompaction 清理间隔 1 小时,阈值 0.2;majorCompaction 清理间隔 24 小时,阈值 0.8。阈值是 Entry Log File 中的剩余有效数据占比。
minorCompactionInterval=3600
minorCompactionThreshold=0.2
majorCompactionThreshold=0.8
majorCompactionInterval=86400
在实际使用中,如果机器节点的磁盘较小且数据迟迟得不到删除,为了及时清除数据,应该按照业务流量和磁盘空间适当调整数据清理间隔时间、有效数据阈值,并配合 compaction 限速策略减小对集群的影响。
vivo 的 Pulsar 指标监控链路架构如下:
图 10. vivo 针对 Pulsar 监控指标搭建的监控架构
该架构中:
-
采用 Prometheus 采集 Pulsar 指标;
-
应用 Prometheus 远程存储特性将格式化后的指标发送到 Kafka;
-
Druid 消费 Kafka 数据后可以作为 Grafana 的数据源,配置 Grafana 面板查询指标。
为什么不使用 Prometheus 存储数据?因为有些数据较久远,一旦集群规模增加,监控指标数量级会很大。Prometheus 对资源依赖重,我们只采用了它的采集能力。
下图是常用的关键指标:
图 11. 关键监控指标
指标类型分为:
-
客户端指标:用来排查客户端出现的异常
-
Broker 端指标:监控 topic 流量、调整 broker 间流量差距
-
Bookie 端指标:排查读写延迟等问题
除了官方指标外,团队还开发了 Bundle 相关的一些指标:
-
分区数、流量等在 Bundle 的分布
-
Broker 端记录读写延迟的 P95/P99 值
-
基于请求对列实现 Broker 端网络负载指标等。
负载均衡的目的是对资源平均分配,差异大会影响稳定性。对负载均衡设置的目标是节点流量偏差 20% 以内,每天均衡频次在 10 次以内,否则客户端会频繁断连、重连。优化后的参数如下:
# load shedding strategy, support OverloadShedder and ThresholdShedder, default is OverloadShedder
loadBalancerLoadSheddingStrategy=org.apache.pulsar.Broker.loadbalance.impl.ThresholdShedder
# enable/disable namespace Bundle auto split
loadBalancerAutoBundleSplitEnabled=false
# enable/disable automatic unloading of split Bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false
#计算新资源使用量时的CPU使用权重(默认1.0)
loadBalancerCPUResourceWeight=0.0
#计算新的资源使用量时的堆内存使用权重(默认1.0)
loadBalancerMemoryResourceWeight=0.0
#计算新资源使用量时的直接内存使用权重(默认1.0)
loadBalancerDirectMemoryResourceWeight=0.0
下面三个参数改为零,是因为集群使用了相同的机型,团队更关注流量均衡,对内存和 CPU 不是特别关注。
以一个具体产品案例来看,其中有 1 个 Topic、30 个分区、180 个 Bundle:
图 12. 1 个 Topic、30 个分区、180 个 Bundle 的每秒入流量
上图节点间流量差异较大,由 Bundle unload 导致。
图 13. 1 个 Topic、30 个分区、180 个 Bundle 下,Bundle 上 Topic 分区情况
上图可看出,有两个 Bundle 分配了四个分区,远超其他 Bundle。实践中出现以下问题:
-
均衡频次高,一天大概有 200 多次
-
客户端连接频繁切换,流量波动大
-
每个 Bundle 的分区数量分布差异大
图 14. 1 个 Topic、30 个分区、180 个 Bundle 的入流量分布
优化过程中,关键在于将分区打散到不同 Bundle 上,但分区数量太少很难做到。Topic 通过哈希算法分配到 Bundle 上在前文已经介绍。此案例中,问题在于分区数量少。
于是团队将分区增加到 120 个,效果如下:
-
节点间流量差异小
-
均衡频次降低,一天大概有 10 次左右
-
客户端连接切换减少,流量波动较小
-
每个 bundle 的分区数量分布差异降低
图 15. 1 个 Topic、120 个分区、180 个 Bundle 的每秒入流量
图 16. 1 个 Topic、120 个分区、180 个 Bundle 下,Bundle 上 Topic 分区情况
图 17. 1 个 Topic、120 个分区、180 个 Bundle 的入流量分布
在和上述业务相同的场景中,分区数量增加后,系统滚动重启后出现了流量下降情况:
图 18. 单个 Topic,30 个分区增加到 120 个,系统滚动重启后流量下降
客户端配置参数:
-
memoryLimitBytes=209715200 (默认为 0)
-
maxPendingMessages=2000 (默认 1000)
-
maxPendingMessagesAcrossPartitions=40000 (默认 50000)
-
batchingMaxPublishDelayMicros=50 (默认 1 毫秒)
-
batchingMaxMessages=2000 (默认 1000)
-
batchingMaxBytes=5242880 (默认 128KB)
满足三个 batch 数据中的任何一个的情况下就会触发打包、发送。
图 19. 重启后 maxPendingMessages(队列长度)出现下降
这里 maxPendingMessages(队列长度)=min(maxPendingMessages, maxPendingMessagesAcrossPartitions/partitionNum)
。而分区数添加(30 -> 120)后,需要重启客户端才对队列长度生效。重启后 maxPendingMessages 队列长度 从 40000/30 = 1333 变为 40000/120 = 333,出现了明显下降。
另外,测试发现 batchingMaxMessages 调小后性能提升 10 倍之多:
图 20. 单个 Topic,30 个分区增加到 120 个,调整后性能提升
建议 batchingMaxPublishDelayMicros
不要过大,确保 batchingMaxMessages
比 maxPendingMessages
要大,否则等待 batchingMaxPublishDelayMicros
才会发送。
某个分区队列满后会导致发送线程阻塞,影响所有分区的整体发送和集群稳定性:
图 21. 执行 Kill-9 一台 Broker 后,其他 Broker 流量下降
图 22. 第四个分区已满,发送线程阻塞在 canEnqueRequest 上,等待时间长,其他未满分区的发送也被影响。
图 23. 极端情况下,第四个分区已满,其他分区等待中。发送线程会在第四个分区阻塞等待,其他线程无法发送。
针对这一问题的优化思路,首先是能者多劳,让发送快的分区尽可能多发送;然后是将阻塞点从 ProducerImpl 移到 PartitionedProducerImpl;如果分区 ProducerImpl 出现队列已满阻塞较长时间,就将该分区排除。
图 24. 宕机导致集群流量骤降优化思路
实践中可分为可用 Producer 和不可用 Producer 两个列表。在 ① 中,两个列表都处于初始化状态并可用;在 ② 中,某个可用分区阻塞一段时间后可以等待一段时间;若不可用就移动到不可用列表中,如 ③ 所示;当分区可用比例达到阈值再挪回可用列表,如 ④ 所示。
经过优化后,宕机 Broker 流量可以快速转移到其他 Broker:
图 25. 优化后 Broker 流量分流并上涨
注:优化只支持 RoundRobinPartitionMessageRouterImpl
路由策略。
在单个 ProducerImpl 对应的 Broker 出现处理慢、网络慢等导致发送响应慢的情况,都可能会导致发送线程阻塞,业务发送消息的速度受限于最慢的 ProducerImpl 的速度。
本文分享了 vivo 在 Pulsar 集群管理与监控的经验,并介绍 vivo 在负载均衡等方面的最佳实践。
由于服务端的问题很难通过监控指标进行分析,vivo 在未来计划实现生产端到消费端的全链路监控能力。大数据团队希望整合大数据组件,支撑 Flink、Spark、Druid 等核心下游组件打通落地。
同时,vivo 内部目前 Pulsar 与 Kafka 同时运行,团队将尝试基于 KoP 对存量 Kafka 万亿流量尝试迁移,降低 Kafka 迁移成本;并探索容器化落地,充分发挥 Pulsar 云原生优势。
-
全利民,vivo 大数据工程师,负责 vivo 分布式消息中间件建设
-
陈建波,vivo 大数据工程师,曾任微服务应用架构师,负责 vivo 分布式消息中间件的建设
原文始发于微信公众号(华仔聊技术):深度剖析 Pulsar 在大厂的探索与实践
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/39571.html