一、MongoShake简介
MongoShake是一个以go语言编写的通用的平台型服务,通过读取MongoDB集群的Oplog日志,对MongoDB的数据进行复制,后续通过操作日志实现特定需求。
MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。现有tunnel通道类型有:Direct:直接写入目的MongoDB RPC:通过net/rpc方式连接 TCP:通过tcp方式连接 File:通过文件方式对接 Kafka:通过Kafka方式对接 Mock:用于测试,不写入tunnel,抛弃所有数据 消费者可以通过对接tunnel通道获取关注的数据,例如对接Direct通道直接写入目的MongoDB,或者对接RPC进行同步数据传输等。此外,用户还可以自己创建自己的API进行灵活接入。
MongoShake对接的源数据库支持单个mongod,replica set和sharding三种模式。目的数据库支持mongod和mongos。如果源端数据库为replica set,建议使用备库以减少主库的压力;如果为sharding模式,那么每个shard都将对接到MongoShake并进行并行抓取。对于目的库来说,可以对接多个mongos,不同的数据将会哈希后写入不同的mongos。
应用场景
从MongoDB副本集同步到MongoDB副本集
从MongoDB副本集同步到MongoDB集群版
从MongoDB集群版同步到MongoDB集群版
从MongoDB副本集同步到kafka通道
云上MongoDB副本集的双向同步(自建不支持,云MongoDB内核有调整:在oplog中加入uk字段,标识涉及到的唯一索引信息)
基本特性
MongoShake提供了并行复制的能力,复制的粒度选项(shard_key)可以为:id,collection或者auto,不同的文档或表可能进入不同的哈希队列并发执行。
id表示按文档进行哈希;collection表示按表哈希;auto表示自动配置,如果有表存在唯一键,则退化为collection,否则等价于id。
按表哈希可以保证一个表内的操作的顺序一致性,但不能保证不同表之间的顺序一致性;按文档哈希可以保证一个表内对同一个文档(主键_id)的操作的顺序一致性,但不能保证对不同文档操作的顺序一致性。
HA方案 MongoShake定期将同步上下文进行存储,存储对象可以为第三方API或者源库。目前的上下文内容为“已经成功同步的oplog时间戳”。在这种情况下,当服务切换或者重启后,通过对接该API或者数据库,新服务能够继续提供服务。
此外,MongoShake还提供了Hypervisor机制用于在服务挂掉的时候,将服务重新拉起。过滤 提供黑名单和白名单机制选择性同步db和collection。
压缩 支持oplog在发送前进行压缩,目前支持的压缩格式有gzip, zlib, 或deflate。
Checkpoint Checkpoint是用于标识同步的位点信息,比如checkpoint=”2018-01-01 12:34″标识已经同步到了”2018-01-01 12:34″这个位点了,那么这个时候如果MongoShake异常退出,那么下次重启可以继续从”2018-01-01 12:34″开始拉取,而不是从头开始。
下面介绍具体实现原理:MongShake采用了ACK机制确保oplog成功回放,如果失败将会引发重传,传输重传的过程类似于TCP的滑动窗口机制。这主要是为了保证应用层可靠性而设计的,比如解压缩失败等等。为了更好的进行说明,先来定义几个名词:LSN(Log Sequence Number),表示已经传输的最新的oplog序号。
LSN_ACK(Acked Log Sequence Number),表示已经收到ack确认的最大LSN,即写入tunnel成功的LSN。LSN_CKPT(Checkpoint Log Sequence Number),表示已经做了checkpoint的LSN,即已经持久化的LSN。
LSN、LSN_ACK和LSN_CKPT的值均来自于Oplog的时间戳ts字段,其中隐含约束是:LSN_CKPT<=LSN_ACK<=LSN。
如上图所示,LSN=16表示已经传输了16条oplog,如果没有重传的话,下次将传输LSN=17;LSN_ACK=13表示前13条都已经收到确认,如果需要重传,最早将从LSN=14开始;LSN_CKPT=8表示已经持久化checkpoint=8。
持久化的意义在于,如果此时MongoShake挂掉重启后,源数据库的oplog将从LSN_CKPT位置开始读取而不是从头LSN=1开始读。因为oplog DML的幂等性,同一数据多次传输不会产生问题。但对于DDL,重传可能会导致错误。索引、DDL同步优化 & 4.0事务支持 从v1.5版本开始,MongoShake优化了DDL语句,保证了正确性。
其基本原理是通过添加全局barrier,一旦发现oplog为DDL语句或者索引,那么会等待这条oplog成功写入目的端并更新checkpoint后,才会放开后续的同步。对于DML语句的同步,还是延用之前的并发模式。在以下非常极端的情况下,可能会存在报错,需要运维介入解决:目的端已经写入DDL但是checkpoint还没更新,这个时候MongoShake挂了,那么重启之后这条DDL重传写入将会导致报错。
同样从v1.5版本,MongoShake对事务语句进行了支持。全量同步 从v1.5版本开始,MongoShake支持全量同步,有3种模式可选:全量同步+增量同步,只全量同步,只增量同步。为了保证高效性,内部同样采用并发处理。排障和限速 MongoShake对外提供Restful API,提供实时查看进程内部各队列数据的同步情况,便于问题排查。另外,还提供限速功能,方便用户进行实时控制,减轻数据库压力。
二、方案
多活方案
在开源MongoDB下,可以根据控制流量分发来达到多活的需求。比如下面这个图,需要通过proxy进行流量分发,比如对a, b库的写操作分发到左边的MongoDB,对c库的写操作分发到右边的MongoDB,源库到目的库的MongoShake链路只同步a, b库(MongoShake提供按db、collection的过滤功能),目的库到源库的MongoShake链路只同步c库。这样就解决了环形复制的问题。
容灾方案
MongoShake搭建了异地容灾链路。用户在2个机房分别部署了2套应用,正常情况下,用户流量通过DNS/SLB只访问主应用,然后再访问到主MongoDB,数据通过MongoShake在2个机房的数据库之间进行同步,一旦机房1不可用,DNS/SLB将用户流量切换到备上,然后继续对外提供读写服务
三、验证
https://github.com/alibaba/MongoShake/releases?spm=a2c4e.10696291.0.0.671e19a45EOxuM 安装包下载地址
上传服务器并解压
tar -zxvf mongo-shake-v2.4.20.tar.gz
collector.conf 文件 为主配置文件
1. 从MongoDB副本集同步到MongoDB副本集
假设源端是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的端也是三副本:10.5.5.5:5005, 10.6.6.6:6006, 10.7.7.7:7007。同步模式是全量+增量同步。则用户需要修改以下几个参数:
mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端连接串信息,逗号分隔不同的mongod
sync_mode = all # all 表示全量+增量,document表示仅全量,oplog表示仅增量
tunnel.address = mongodb://username:password@10.5.5.5:5005, 10.6.6.6:6006, 10.7.7.7:7007 #目的端连接串信息,逗号分隔不同的mongod
2. 从MongoDB副本集同步到MongoDB集群版
假设源同样是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的端是sharding,有多个mongos:20.1.1.1:2021, 20.2.2.2:2022, 20.3.3.3:3033。
mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端连接串信息,逗号分隔不同的mongod
sync_mode = all # all 表示全量+增量,document表示仅全量,oplog表示仅增量
tunnel.address = mongodb://username:password@20.1.1.1:2021;20.2.2.2:2022;20.3.3.3:3033 #目的端连接串信息,分号分割不同的mongos。也可以只配置部分,配置多个mongos可以做负载均衡写入。
3. 从MongoDB集群版同步到MongoDB集群版
假设源是2节点:节点1是10.1.1.1:1001, 10.1.1.2:2002, 10.1.1.3:3003;节点2是10.2.2.1:1001, 10.2.2.2:2002, 10.2.2.3:3003。目的端是sharding,有多个mongos:20.1.1.1:2021, 20.2.2.2:2022, 20.3.3.3:3033。
mongo_urls = mongodb://username1:password1@10.1.1.1:1001,10.1.1.2:2002,10.1.1.3:3003;mongodb://username2:password2@10.2.2.1:1001,10.2.2.2:2002,10.2.2.3:3003 #源端连接串信息,逗号分隔同一个shard不同的mongod,分号分隔不同的shard。
sync_mode = all # all 表示全量+增量,document表示仅全量,oplog表示仅增量
tunnel.address = mongodb://username:password@20.1.1.1:2021;20.2.2.2:2022;20.3.3.3:3033 #目的端连接串信息,分号分割不同的mongos。也可以只配置部分,配置多个mongos可以做负载均衡写入。
context.storage.url = mongodb://username1:password1@10.5.5.5:5555,10.5.5.6:5556 # 如果源端是sharding,此处需要配置源端sharding的cs的地址
4. 从MongoDB副本集同步到kafka通道
假设源同样是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的kafka是50.1.1.1:6379,topic是test。
mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端连接串信息,逗号分隔不同的mongod
sync_mode = oplog # 如果目的端不是mongodb,仅支持增量同步模式
tunnel.type = kafka
tunnel.address = test@50.1.1.1:6379
5. 云上MongoDB副本集的双向同步
云上副本集的双向同步可以参考副本集的单向同步,但是需要注意的有以下几点,假设A和B之间双向同步:
-
1. 需要搭建2个mongoshake,一个从A到B,另一个从B到A
-
2. 两条mongoshake不能同时用全量+增量(all)模式,正常应该是一个库为空(假设B),另一个有数据。那么从A到B先发起一次全量+增量同步,等待全量同步完毕以后,再从B到A发起一次增量同步。
-
3. 双向同步需要依赖gid的开启,这个可以联系烛昭(通过售后联系),开启gid将会重启实例造成秒级别闪断。
-
4. gid用于记录数据的产生地,比如从A产生的数据导入到B以后,不会被再导入回A,这样就不会产生环形复制。需要注意的是,这个gid只能用于增量,这也是第2条为什么一个方向通道是全量+增量,另一个方向通道需要搭建增量的原因。
-
5. 云下开源的mongodb不能使用双向同步,因为gid的修改是在内核里面,所以开源不支持。
-
6. sharding同样也支持双向同步
6. 部分配置文件参数解释
具体请查看配置文件的注释,此处只做简单解释
-
• mongo_urls: 源mongodb的连接地址
-
• mongo_connect_mode: 源端连接的模式,有几种模式可选:从seconary拉取;从primary拉取;secondary优先拉取;单节点拉取
-
• sync_mode: sync模式,有几种模式可选:全量,增量,全量+增量
-
• http_profile: 提供restful接口,用户可以查看一些内部运行情况,也可以对接监控。
-
• system_profile: profile端口,可以查看进程运行的堆栈情况。
-
• log: log日志相关参数。
-
• filter.namespace.black: 黑名单过滤。黑名单内的库表不会被同步,剩下的同步。
-
• filter.namespace.white: 白名单过滤。白名单内的库表会被同步,剩下的过滤掉。黑白名单最多只能配置一个,不配置会同步所有库表。
-
• filter.pass.special.db: 有些特别的库表会被过滤,如admin,local, config库,如果一定要开启,可以在这里进行配置。
-
• oplog.gids: 用于云上双向同步。
-
• shard_key: 内部对数据多线程的哈希方式,默认collection表示按表级别进行哈希。
-
• worker: 增量阶段并发写入的线程数,如果增量阶段性能不够,可以提高这个配置。
-
• worker内部相关配置: worker.batch_queue_size, adaptive.batching_max_size, fetcher.buffer_capacity, 关于内部队列的相关配置,具体请参考github wiki文档。
-
• worker.oplog_compressor: 压缩模式,如果是非direct模式开启这个可以减少网络传输的开销。
-
• tunnel.address: 目的端对接的地址。
-
• context.storage: checkpoint存储的位置,database表示把数据存入MongoDB,api表示把数据存入用户自己提供的http接口。
-
• context.storage.url: checkpoint写入到哪个MongoDB,如果源是sharding,此处配置cs地址,checkpoint会写入admin库;如果是副本集,不配置,会默认写入源库,配置则写入配置的库里面。
-
• context.address: checkpoint写入的表的名字。
-
• context.start_position: checkpoint启动开始拉取的增量时间位点。如果本身checkpoint已经存在(参考上述context的位置),那么则按照context信息进行拉取,如果不存在,则按照这个位点进行增量拉取。
-
• master_quorum: 如果以主备模式拉取同一个源,则这个参数需要启用。
-
• transform.namespace: 命名空间的转换,a.b:c.d表示把源端a库下面的c表同步到目的端c库下面的d表。
-
• replayer.dml_only: 默认不同步DDL,false表示同步DDL。DDL包括建表,删库,建索引等语句。
-
• replayer.executor.upsert: 目的端如果update语句对应的主键id不存在,是否将update语句更改为insert语句。
-
• replayer.executor.insert_on_dup_update: 目的端如果insert语句对应的主键id已经存在,是否将insert语句更改为update语句。
-
• replayer.conflict_write_to: 对于写入冲突的情况,是否需要记录冲突的文档。
-
• replayer.durable: 测试选项,false表示取消写入,只用于拉取调试。
-
• replayer.collection_parallel: 全量同步按表并发的并发度。
-
• replayer.document_parallel: 全量同步同一个表内并发写入的线程数。
-
• replayer.document_batch_size: 全量同步一次性batch的大小。
-
• replayer.collection_drop: 如果目的库表存在,是否先删除目的库再进行同步。
7. 问题总结
权限
对于完全同步,MongoShake需要每个数据库的读取权限。对于增量,MongoShake需要local数据库的读取权限和数据库的写入权限mongoshake。
版本支持
MongoShake不支持3.0以下的MongoDB版本。
MongoShake重启
sync.mode是all,重启后如果检查点存在且有效,这意味着最早的操作日志小于检查点,则MongoShake将仅运行增加同步。否则,MongoShake将再次运行完全同步,此后,将运行增加同步。因此,如果用户仍想运行完全同步但检查点存在,mongoshake.ckpt_default则应手动删除检查点。
MongoShake支持同步DDL
使用replayer.dml_only选项。但是DDL不是幂等操作,一旦失败,oplog可能会重放,因此启用DDL在最新版本中可能会出现问题。
断点恢复
MongoShake支持基于检查点机制的断点恢复,每次启动时,它都会读取检查点,这是一个时间戳,指示已准备好重播多少数据。之后,它将从此时间戳开始从源中提取数据。因此重启时不会丢失数据。
oplog一致性
mongoshake不支持oplog的严格一致性,当shard_key是auto/collection,mongoshake支持顺序一致性其中同一命名空间(装置ns),该序列可以得到保证。如果shard_key为id,则mongoshake支持最终一致性。
监控
MongoShake提供了api(在配置中默认为9100 http_profile)来监控服务器方面的内部状态:
worker:显示内部工作者状态,包括worker_id,jobs_in_queue,jobs_unack_buffer,last_unack,last_ack,count。
sentinel:显示前哨配置:(OplogDump转储oplog日志,“ 0”表示无日志,“ 1”表示采样,“ 2”表示全部转储),DuplicatedDump(如果启用则将重复的oplog写入日志),Pause(如果出现以下情况,整个mongoshake同步将被暂停启用),TPS(控制数据同步的速度)。
repl:显示总体状态:(logs_get我们获得多少操作日志),logs_repl(我们重播多少操作日志),logs_success(我们成功重播多少操作日志),lsn(最后发送),lsn_ack(除0以外的所有工作队列中的最小ack值),lsn_ckpt(检查点)now,replset,tag,who。
conf:显示配置。
可以使用curl命令访问此端口。此外,提供了mongoshake-stat脚本,通过以下实时方式通过静态API监控MongoShake。
./mongoshake-stat --port=9100
结果将每秒清除一次
logs_get:一秒钟内我们获得了多少个oplog。
logs_repl:我们在一秒钟内重播了多少oplog。
logs_success:我们在一秒钟内成功重播了多少操作日志,即TPS。
原文始发于微信公众号(背带裤的云原生):MongoDB异地同步利器:深入解析mongo-shake的魅力
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/219090.html