一文吃透Redis RDB持久化:从原理到源码的全方位解析

一、Redis RDB使用场景

RDB主要用于两方面:

  1. 数据持久化。将内存中的数据集快照保存到磁盘中,实现断电重启后数据恢复。

  2. 主从复制。主节点生成RDB文件发送给从节点,用于初次全量复制或连接重建时快速同步。

这两种场景下,RDB都可以提供比AOF方式更好的恢复速度。

二、RDB配置启动

RDB的相关配置

  1. save:设置触发RDB条件, 比如默认900秒内至少1个key变更。

  2. dbfilename:设置RDB文件名称,默认dump.rdb。

  3. dir:设置RDB文件保存目录。

  4. stop-writes-on-bgsave-error:后台保存错误时是否停止写入。

通过配置保存条件,可以控制RDB持久化的频率。

save默认配置

如果配置文件没有配置save,服务启动时相当于配置以下保存策略

  1. # 距离上次生产rdb 1小时以上并且至少发生1次写操作, 开启bg save生产rdb

  2. save 3600 1 # After 3600 seconds (an hour) if at least 1 key changed

  3. # 距离上次生产rdb 5分钟以上并且至少发生300次写操作, 开启bg save生产rdb

  4. save 300 100 # * After 300 seconds (5 minutes) if at least 100 keys changed

  5. # 距离上次生产rdb 1分钟以上并且至少发生10000次写操作, 开启bg save生产rdb

  6. save 60 10000 # After 60 seconds if at least 10000 keys changed

三、RDB工作原理

  1. 保存时,Redis创建RDB文件的子进程。

  2. RDB子进程遍历内存键值对,编码后写入临时文件。

  3. 替换旧RDB文件,更新生成时间戳。

  4. 主节点生成RDB传给从节点,从节点接收加载到内存中。

RDB通过创建子进程实现保存,最大程度减少性能影响。

四、RDB源码剖析

redis源码版本:6.2.5 本文从以下几个场景带着大家走读代码:

  • 启动时 加载rdb文件

  • 定时启动子进程保存rdb

  • save命令和bgsave命令

  • diskless模式rdb处理

rdb相关的变量

在进入具体场景分析前,先来认识下与rdb相关的变量

  1. // path:src/server.h

  2. struct redisServer {

  3. //...

  4. /* RDB persistence */

  5. // rdb相关配置项和全局变量

  6. // 写操作地址dirty数值

  7. long long dirty; /* Changes to DB from the last save */

  8. // 开启后台进程前保存当时ditry值

  9. // 通过dirty-dirty_before_bgsave可以知道在后台子进程生成rdb的过程中

  10. // kv 是否有产生了多少新的变化

  11. long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */

  12. // bg save 保存策略

  13. // saveparam 包含seconds,代表上次rdb save距离当前至少多少时间就可以再次 save rdb

  14. // saveparam 包含change, 即dirty数值至少大于多少时 就可以再次 save rdb

  15. // 用来判断是否可以开启新的一次 bg save rdb

  16. struct saveparam *saveparams; /* Save points array for RDB */

  17. // saveparams 数组长度

  18. int saveparamslen; /* Number of saving points */

  19. // 保存rdb 文件名

  20. char *rdb_filename; /* Name of RDB file */

  21. // 是否使用压缩

  22. int rdb_compression; /* Use compression in RDB? */

  23. // 保存生成rdb过程中的checksum

  24. int rdb_checksum; /* Use RDB checksum? */

  25. // 是否同步删除rdb文件(实例没有持久化时)

  26. int rdb_del_sync_files; /* Remove RDB files used only for SYNC if

  27. the instance does not use persistence. */

  28. // 最后一次save rdb时间戳

  29. time_t lastsave; /* Unix time of last successful save */

  30. //// 最后一次 参数 bg save rdb时间戳

  31. time_t lastbgsave_try; /* Unix time of last attempted bgsave */

  32. // 最后一个异步生成rdb 使用的时间

  33. time_t rdb_save_time_last; /* Time used by last RDB save run. */

  34. // 当前生产rdb 的开始时间

  35. time_t rdb_save_time_start; /* Current RDB save start time. */

  36. // 设置为1,代表没有其他子进程可以拉起bg save

  37. // server_cron 会定时检测

  38. int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */

  39. // rdb 类型

  40. // #define RDB_CHILD_TYPE_NONE 0

  41. // #define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */

  42. // #define RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to slave socket. */

  43. int rdb_child_type; /* Type of save by active child. */

  44. // 记录最后一次bg save 成功或者失败

  45. int lastbgsave_status; /* C_OK or C_ERR */

  46. // 如果被设置, 即不能bgsave 就不能写

  47. // 即lastbgsave_status 为err 并且stop_writes_on_bgsave_err被设置

  48. // 无法执行写命令

  49. int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */

  50. // 主进程读取子进程发过来的rdb fd

  51. int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */

  52. /* to the parent process in diskless repl. */

  53. // 主进程通知子进程退出的fd(RDB_CHILD_TYPE_SOCKET模式下有用)

  54. int rdb_child_exit_pipe; /* Used by the diskless parent allow child exit. */

  55. // 保存此次RDB_CHILD_TYPE_SOCKET时, 需要把rdb内容发送给那些副本(从)的客户端

  56. connection **rdb_pipe_conns; /* Connections which are currently the */

  57. // rdb_pipe_conns 数量

  58. int rdb_pipe_numconns; /* target of diskless rdb fork child. */

  59. // 记录多少conns需要等待写事件继续发送rdb

  60. int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */

  61. // 从子进程读取的rdb内存保存在rdb_pipe_buff

  62. char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */

  63. // rdb_pipe_buff的大小

  64. int rdb_pipe_bufflen; /* that was read from the the rdb pipe. */

  65. // 测试用

  66. int rdb_key_save_delay; /* Delay in microseconds between keys while

  67. * writing the RDB. (for testings). negative

  68. * value means fractions of microsecons (on average). */

  69. // 测试用

  70. int key_load_delay; /* Delay in microseconds between keys while

  71. * loading aof or rdb. (for testings). negative

  72. * value means fractions of microsecons (on average). */

  73. // 通过的子进程向主进程发送信息用的管道

  74. /* Pipe and data structures for child -> parent info sharing. */

  75. int child_info_pipe[2]; /* Pipe used to write the child_info_data. */

  76. int child_info_nread; /* Num of bytes of the last read from pipe */

  77. //...

  78. }


启动时加载rdb文件

  1. // path: src/rdb.c

  2. /* Mark that we are loading in the global state and setup the fields

  3. * needed to provide loading stats. */

  4. // 开始加载rdb, 设置一些全局状态

  5. void startLoading(size_t size, int rdbflags) {

  6. /* Load the DB */

  7. server.loading = 1; // 设置loading标记

  8. server.loading_start_time = time(NULL); // 开始时间

  9. server.loading_loaded_bytes = 0; // 已经加载字节数

  10. server.loading_total_bytes = size; // 总共的字节数

  11. server.loading_rdb_used_mem = 0; // 使用内容

  12. blockingOperationStarts();

  13. /* Fire the loading modules start event. */

  14. // 通知modlues 事件

  15. int subevent;

  16. if (rdbflags & RDBFLAGS_AOF_PREAMBLE)

  17. subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;

  18. else if(rdbflags & RDBFLAGS_REPLICATION)

  19. subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;

  20. else

  21. subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;

  22. moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);

  23. }

  24. /* Mark that we are loading in the global state and setup the fields

  25. * needed to provide loading stats.

  26. * 'filename' is optional and used for rdb-check on error */

  27. // 开始从文件加载

  28. void startLoadingFile(FILE *fp, char* filename, int rdbflags) {

  29. struct stat sb;

  30. // 判断文件是否存在

  31. if (fstat(fileno(fp), &sb) == -1)

  32. sb.st_size = 0;

  33. // 记录加载文件名

  34. rdbFileBeingLoaded = filename;

  35. // 调用startLoading 设置一些全局状态

  36. startLoading(sb.st_size, rdbflags);

  37. }

  38. /* Refresh the loading progress info */

  39. // 刷新加载的进度 和内存使用量

  40. void loadingProgress(off_t pos) {

  41. server.loading_loaded_bytes = pos;

  42. if (server.stat_peak_memory < zmalloc_used_memory())

  43. server.stat_peak_memory = zmalloc_used_memory();

  44. }

  45. /* Loading finished */

  46. // 停止加载,设置一些全局变量

  47. void stopLoading(int success) {

  48. server.loading = 0;

  49. blockingOperationEnds();

  50. rdbFileBeingLoaded = NULL;

  51. /* Fire the loading modules end event. */

  52. moduleFireServerEvent(REDISMODULE_EVENT_LOADING,

  53. success?

  54. REDISMODULE_SUBEVENT_LOADING_ENDED:

  55. REDISMODULE_SUBEVENT_LOADING_FAILED,

  56. NULL);

  57. }

  58. /* Load a "type" in RDB format, that is a one byte unsigned integer.

  59. * This function is not only used to load object types, but also special

  60. * "types" like the end-of-file type, the EXPIRE type, and so forth. */

  61. // 读取1字节内容作为type

  62. int rdbLoadType(rio *rdb) {

  63. unsigned char type;

  64. if (rioRead(rdb,&type,1) == 0) return -1;

  65. return type;

  66. }

  67. /* Load a string object from an RDB file according to flags:

  68. *

  69. * RDB_LOAD_NONE (no flags): load an RDB object, unencoded.

  70. * RDB_LOAD_ENC: If the returned type is a Redis object, try to

  71. * encode it in a special way to be more memory

  72. * efficient. When this flag is passed the function

  73. * no longer guarantees that obj->ptr is an SDS string.

  74. * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc()

  75. * instead of a Redis object with an sds in it.

  76. * RDB_LOAD_SDS: Return an SDS string instead of a Redis object.

  77. *

  78. * On I/O error NULL is returned.

  79. */

  80. // 从rdb 加载string

  81. void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {

  82. int encode = flags & RDB_LOAD_ENC;

  83. int plain = flags & RDB_LOAD_PLAIN;

  84. int sds = flags & RDB_LOAD_SDS;

  85. int isencoded;

  86. unsigned long long len;

  87. // 先读长度, isencoded代表是RDB_ENCVAL编码

  88. len = rdbLoadLen(rdb,&isencoded);

  89. if (len == RDB_LENERR) return NULL;

  90. // 如果是RDB_ENCVAL, 具体读取类型

  91. if (isencoded) {

  92. switch(len) {

  93. case RDB_ENC_INT8:

  94. case RDB_ENC_INT16:

  95. case RDB_ENC_INT32:

  96. // string压缩为int,需要反解为string

  97. return rdbLoadIntegerObject(rdb,len,flags,lenptr);

  98. case RDB_ENC_LZF:

  99. // 压缩为Lzf的字符串,解压缩为正常字符串

  100. return rdbLoadLzfStringObject(rdb,flags,lenptr);

  101. default:

  102. rdbReportCorruptRDB("Unknown RDB string encoding type %llu",len);

  103. return NULL;

  104. }

  105. }

  106. // 没有压缩,直接读取原始字节返回

  107. // 根据flag 返回不同的类型

  108. if (plain || sds) {

  109. void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT,len);

  110. if (!buf) {

  111. serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len);

  112. return NULL;

  113. }

  114. if (lenptr) *lenptr = len;

  115. if (len && rioRead(rdb,buf,len) == 0) {

  116. if (plain)

  117. zfree(buf);

  118. else

  119. sdsfree(buf);

  120. return NULL;

  121. }

  122. return buf;

  123. } else {

  124. robj *o = encode ? createStringObject(SDS_NOINIT,len) :

  125. createRawStringObject(SDS_NOINIT,len);

  126. if (len && rioRead(rdb,o->ptr,len) == 0) {

  127. decrRefCount(o);

  128. return NULL;

  129. }

  130. return o;

  131. }

  132. }

  133. // 从rdb读取string

  134. // 要求返回obj-raw

  135. robj *rdbLoadStringObject(rio *rdb) {

  136. return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);

  137. }

  138. // 从rdb读取string

  139. // 要求返回obj(带压缩)

  140. robj *rdbLoadEncodedStringObject(rio *rdb) {

  141. return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC,NULL);

  142. }

  143. /* Load a Redis object of the specified type from the specified file.

  144. * On success a newly allocated object is returned, otherwise NULL. */

  145. // 加载rdb object

  146. robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {

  147. robj *o = NULL, *ele, *dec;

  148. uint64_t len;

  149. unsigned int i;

  150. int deep_integrity_validation = server.sanitize_dump_payload == SANITIZE_DUMP_YES;

  151. if (server.sanitize_dump_payload == SANITIZE_DUMP_CLIENTS) {

  152. /* Skip sanitization when loading (an RDB), or getting a RESTORE command

  153. * from either the master or a client using an ACL user with the skip-sanitize-payload flag. */

  154. int skip = server.loading ||

  155. (server.current_client && (server.current_client->flags & CLIENT_MASTER));

  156. if (!skip && server.current_client && server.current_client->user)

  157. skip = !!(server.current_client->user->flags & USER_FLAG_SANITIZE_PAYLOAD_SKIP);

  158. deep_integrity_validation = !skip;

  159. }

  160. // 判断RDB_Type

  161. // 根据不同type进行价值

  162. if (rdbtype == RDB_TYPE_STRING) {

  163. /* Read string value */

  164. if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;

  165. o = tryObjectEncoding(o);

  166. } else if (rdbtype == RDB_TYPE_LIST) {

  167. /* Read list value */

  168. // list, 先读取数量

  169. if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;

  170. o = createQuicklistObject();

  171. quicklistSetOptions(o->ptr, server.list_max_ziplist_size,

  172. server.list_compress_depth);

  173. /* Load every single element of the list */

  174. // 逐个读取

  175. while(len--) {

  176. if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) {

  177. decrRefCount(o);

  178. return NULL;

  179. }

  180. dec = getDecodedObject(ele);

  181. size_t len = sdslen(dec->ptr);

  182. quicklistPushTail(o->ptr, dec->ptr, len);

  183. decrRefCount(dec);

  184. decrRefCount(ele);

  185. }

  186. } else if (rdbtype == RDB_TYPE_SET) {

  187. // Set 读取长度

  188. /* Read Set value */

  189. if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;

  190. /* Use a regular set when there are too many entries. */

  191. // 数量是否超过压缩大小,创建不同对象

  192. if (len > server.set_max_intset_entries) {

  193. o = createSetObject();

  194. /* It's faster to expand the dict to the right size asap in order

  195. * to avoid rehashing */

  196. if (len > DICT_HT_INITIAL_SIZE && dictTryExpand(o->ptr,len) != DICT_OK) {

  197. rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)len);

  198. decrRefCount(o);

  199. return NULL;

  200. }

  201. } else {

  202. o = createIntsetObject();

  203. }

  204. /* Load every single element of the set */

  205. // 逐个读取原始

  206. for (i = 0; i < len; i++) {

  207. long long llval;

  208. sds sdsele;

  209. if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {

  210. decrRefCount(o);

  211. return NULL;

  212. }

  213. // 判断是否是压缩做不同逻辑

  214. if (o->encoding == OBJ_ENCODING_INTSET) {

  215. /* Fetch integer value from element. */

  216. if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) {

  217. uint8_t success;

  218. o->ptr = intsetAdd(o->ptr,llval,&success);

  219. if (!success) {

  220. rdbReportCorruptRDB("Duplicate set members detected");

  221. decrRefCount(o);

  222. sdsfree(sdsele);

  223. return NULL;

  224. }

  225. } else {

  226. setTypeConvert(o,OBJ_ENCODING_HT);

  227. if (dictTryExpand(o->ptr,len) != DICT_OK) {

  228. rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)len);

  229. sdsfree(sdsele);

  230. decrRefCount(o);

  231. return NULL;

  232. }

  233. }

  234. }

  235. /* This will also be called when the set was just converted

  236. * to a regular hash table encoded set. */

  237. if (o->encoding == OBJ_ENCODING_HT) {

  238. if (dictAdd((dict*)o->ptr,sdsele,NULL) != DICT_OK) {

  239. rdbReportCorruptRDB("Duplicate set members detected");

  240. decrRefCount(o);

  241. sdsfree(sdsele);

  242. return NULL;

  243. }

  244. } else {

  245. sdsfree(sdsele);

  246. }

  247. }

  248. } else if (rdbtype == RDB_TYPE_ZSET_2 || rdbtype == RDB_TYPE_ZSET) {

  249. /* Read list/set value. */

  250. uint64_t zsetlen;

  251. size_t maxelelen = 0;

  252. zset *zs;

  253. // zset, 读取元素数量

  254. if ((zsetlen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;

  255. o = createZsetObject();

  256. zs = o->ptr;

  257. if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict,zsetlen) != DICT_OK) {

  258. rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)zsetlen);

  259. decrRefCount(o);

  260. return NULL;

  261. }

  262. /* Load every single element of the sorted set. */

  263. // 逐个元素加载

  264. while(zsetlen--) {

  265. sds sdsele;

  266. double score;

  267. zskiplistNode *znode;

  268. if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {

  269. decrRefCount(o);

  270. return NULL;

  271. }

  272. if (rdbtype == RDB_TYPE_ZSET_2) {

  273. if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) {

  274. decrRefCount(o);

  275. sdsfree(sdsele);

  276. return NULL;

  277. }

  278. } else {

  279. if (rdbLoadDoubleValue(rdb,&score) == -1) {

  280. decrRefCount(o);

  281. sdsfree(sdsele);

  282. return NULL;

  283. }

  284. }

  285. /* Don't care about integer-encoded strings. */

  286. if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);

  287. znode = zslInsert(zs->zsl,score,sdsele);

  288. if (dictAdd(zs->dict,sdsele,&znode->score) != DICT_OK) {

  289. rdbReportCorruptRDB("Duplicate zset fields detected");

  290. decrRefCount(o);

  291. /* no need to free 'sdsele', will be released by zslFree together with 'o' */

  292. return NULL;

  293. }

  294. }

  295. /* Convert *after* loading, since sorted sets are not stored ordered. */

  296. if (zsetLength(o) <= server.zset_max_ziplist_entries &&

  297. maxelelen <= server.zset_max_ziplist_value)

  298. zsetConvert(o,OBJ_ENCODING_ZIPLIST);

  299. } else if (rdbtype == RDB_TYPE_HASH) {

  300. uint64_t len;

  301. int ret;

  302. sds field, value;

  303. dict *dupSearchDict = NULL;

  304. // 加载hash 元素数量

  305. len = rdbLoadLen(rdb, NULL);

  306. if (len == RDB_LENERR) return NULL;

  307. o = createHashObject();

  308. // 是否要用压缩类型

  309. /* Too many entries? Use a hash table right from the start. */

  310. if (len > server.hash_max_ziplist_entries)

  311. hashTypeConvert(o, OBJ_ENCODING_HT);

  312. else if (deep_integrity_validation) {

  313. /* In this mode, we need to guarantee that the server won't crash

  314. * later when the ziplist is converted to a dict.

  315. * Create a set (dict with no values) to for a dup search.

  316. * We can dismiss it as soon as we convert the ziplist to a hash. */

  317. dupSearchDict = dictCreate(&hashDictType, NULL);

  318. }

  319. /* Load every field and value into the ziplist */

  320. // OBJ_ENCODING_ZIPLIST类型处理

  321. while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {

  322. len--;

  323. /* Load raw strings */

  324. if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {

  325. decrRefCount(o);

  326. if (dupSearchDict) dictRelease(dupSearchDict);

  327. return NULL;

  328. }

  329. if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {

  330. sdsfree(field);

  331. decrRefCount(o);

  332. if (dupSearchDict) dictRelease(dupSearchDict);

  333. return NULL;

  334. }

  335. if (dupSearchDict) {

  336. sds field_dup = sdsdup(field);

  337. if (dictAdd(dupSearchDict, field_dup, NULL) != DICT_OK) {

  338. rdbReportCorruptRDB("Hash with dup elements");

  339. dictRelease(dupSearchDict);

  340. decrRefCount(o);

  341. sdsfree(field_dup);

  342. sdsfree(field);

  343. sdsfree(value);

  344. return NULL;

  345. }

  346. }

  347. /* Add pair to ziplist */

  348. o->ptr = ziplistPush(o->ptr, (unsigned char*)field,

  349. sdslen(field), ZIPLIST_TAIL);

  350. o->ptr = ziplistPush(o->ptr, (unsigned char*)value,

  351. sdslen(value), ZIPLIST_TAIL);

  352. /* Convert to hash table if size threshold is exceeded */

  353. if (sdslen(field) > server.hash_max_ziplist_value ||

  354. sdslen(value) > server.hash_max_ziplist_value)

  355. {

  356. sdsfree(field);

  357. sdsfree(value);

  358. hashTypeConvert(o, OBJ_ENCODING_HT);

  359. break;

  360. }

  361. sdsfree(field);

  362. sdsfree(value);

  363. }

  364. if (dupSearchDict) {

  365. /* We no longer need this, from now on the entries are added

  366. * to a dict so the check is performed implicitly. */

  367. dictRelease(dupSearchDict);

  368. dupSearchDict = NULL;

  369. }

  370. if (o->encoding == OBJ_ENCODING_HT && len > DICT_HT_INITIAL_SIZE) {

  371. if (dictTryExpand(o->ptr,len) != DICT_OK) {

  372. rdbReportCorruptRDB("OOM in dictTryExpand %llu", (unsigned long long)len);

  373. decrRefCount(o);

  374. return NULL;

  375. }

  376. }

  377. /* Load remaining fields and values into the hash table */

  378. // OBJ_ENCODING_HT类型处理

  379. while (o->encoding == OBJ_ENCODING_HT && len > 0) {

  380. len--;

  381. /* Load encoded strings */

  382. if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {

  383. decrRefCount(o);

  384. return NULL;

  385. }

  386. if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {

  387. sdsfree(field);

  388. decrRefCount(o);

  389. return NULL;

  390. }

  391. /* Add pair to hash table */

  392. ret = dictAdd((dict*)o->ptr, field, value);

  393. if (ret == DICT_ERR) {

  394. rdbReportCorruptRDB("Duplicate hash fields detected");

  395. sdsfree(value);

  396. sdsfree(field);

  397. decrRefCount(o);

  398. return NULL;

  399. }

  400. }

  401. /* All pairs should be read by now */

  402. serverAssert(len == 0);

  403. } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST) {

  404. // list,先加载梳理

  405. if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;

  406. o = createQuicklistObject();

  407. quicklistSetOptions(o->ptr, server.list_max_ziplist_size,

  408. server.list_compress_depth);

  409. // 逐个元素加载

  410. while (len--) {

  411. size_t encoded_len;

  412. unsigned char *zl =

  413. rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&encoded_len);

  414. if (zl == NULL) {

  415. decrRefCount(o);

  416. return NULL;

  417. }

  418. if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;

  419. if (!ziplistValidateIntegrity(zl, encoded_len, deep_integrity_validation, NULL, NULL)) {

  420. rdbReportCorruptRDB("Ziplist integrity check failed.");

  421. decrRefCount(o);

  422. zfree(zl);

  423. return NULL;

  424. }

  425. quicklistAppendZiplist(o->ptr, zl);

  426. }

  427. } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP ||

  428. rdbtype == RDB_TYPE_LIST_ZIPLIST ||

  429. rdbtype == RDB_TYPE_SET_INTSET ||

  430. rdbtype == RDB_TYPE_ZSET_ZIPLIST ||

  431. rdbtype == RDB_TYPE_HASH_ZIPLIST)

  432. {

  433. // rdb 里保存的就是压缩类型的字节流

  434. size_t encoded_len;

  435. // 把字节流读出来

  436. unsigned char *encoded =

  437. rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&encoded_len);

  438. if (encoded == NULL) return NULL;

  439. o = createObject(OBJ_STRING,encoded); /* Obj type fixed below. */

  440. /* Fix the object encoding, and make sure to convert the encoded

  441. * data type into the base type if accordingly to the current

  442. * configuration there are too many elements in the encoded data

  443. * type. Note that we only check the length and not max element

  444. * size as this is an O(N) scan. Eventually everything will get

  445. * converted. */

  446. // 根据rdb 不同压缩类型处理

  447. switch(rdbtype) {

  448. case RDB_TYPE_HASH_ZIPMAP:

  449. // hash 类型处理

  450. /* Since we don't keep zipmaps anymore, the rdb loading for these

  451. * is O(n) anyway, use `deep` validation. */

  452. if (!zipmapValidateIntegrity(encoded, encoded_len, 1)) {

  453. rdbReportCorruptRDB("Zipmap integrity check failed.");

  454. zfree(encoded);

  455. o->ptr = NULL;

  456. decrRefCount(o);

  457. return NULL;

  458. }

  459. /* Convert to ziplist encoded hash. This must be deprecated

  460. * when loading dumps created by Redis 2.4 gets deprecated. */

  461. {

  462. unsigned char *zl = ziplistNew();

  463. unsigned char *zi = zipmapRewind(o->ptr);

  464. unsigned char *fstr, *vstr;

  465. unsigned int flen, vlen;

  466. unsigned int maxlen = 0;

  467. dict *dupSearchDict = dictCreate(&hashDictType, NULL);

  468. while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {

  469. if (flen > maxlen) maxlen = flen;

  470. if (vlen > maxlen) maxlen = vlen;

  471. zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);

  472. zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);

  473. /* search for duplicate records */

  474. sds field = sdstrynewlen(fstr, flen);

  475. if (!field || dictAdd(dupSearchDict, field, NULL) != DICT_OK) {

  476. rdbReportCorruptRDB("Hash zipmap with dup elements, or big length (%u)", flen);

  477. dictRelease(dupSearchDict);

  478. sdsfree(field);

  479. zfree(encoded);

  480. o->ptr = NULL;

  481. decrRefCount(o);

  482. return NULL;

  483. }

  484. }

  485. dictRelease(dupSearchDict);

  486. zfree(o->ptr);

  487. o->ptr = zl;

  488. o->type = OBJ_HASH;

  489. o->encoding = OBJ_ENCODING_ZIPLIST;

  490. if (hashTypeLength(o) > server.hash_max_ziplist_entries ||

  491. maxlen > server.hash_max_ziplist_value)

  492. {

  493. hashTypeConvert(o, OBJ_ENCODING_HT);

  494. }

  495. }

  496. break;

  497. case RDB_TYPE_LIST_ZIPLIST:

  498. // list 类型处理

  499. if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;

  500. // 检测是否有效list

  501. if (!ziplistValidateIntegrity(encoded, encoded_len, deep_integrity_validation, NULL, NULL)) {

  502. rdbReportCorruptRDB("List ziplist integrity check failed.");

  503. zfree(encoded);

  504. o->ptr = NULL;

  505. decrRefCount(o);

  506. return NULL;

  507. }

  508. // 有效,直接使用二进制字节流

  509. o->type = OBJ_LIST;

  510. o->encoding = OBJ_ENCODING_ZIPLIST;

  511. listTypeConvert(o,OBJ_ENCODING_QUICKLIST);

  512. break;

  513. case RDB_TYPE_SET_INTSET:

  514. // set 类型处理

  515. if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;

  516. // 检测是否有效list

  517. if (!intsetValidateIntegrity(encoded, encoded_len, deep_integrity_validation)) {

  518. rdbReportCorruptRDB("Intset integrity check failed.");

  519. zfree(encoded);

  520. o->ptr = NULL;

  521. decrRefCount(o);

  522. return NULL;

  523. }

  524. // 有效,直接使用二进制字节流

  525. o->type = OBJ_SET;

  526. o->encoding = OBJ_ENCODING_INTSET;

  527. if (intsetLen(o->ptr) > server.set_max_intset_entries)

  528. // 超出压缩要求大小,转化成dict

  529. setTypeConvert(o,OBJ_ENCODING_HT);

  530. break;

  531. case RDB_TYPE_ZSET_ZIPLIST:

  532. // zset 类型处理

  533. if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;

  534. // 检测是否有效

  535. if (!zsetZiplistValidateIntegrity(encoded, encoded_len, deep_integrity_validation)) {

  536. rdbReportCorruptRDB("Zset ziplist integrity check failed.");

  537. zfree(encoded);

  538. o->ptr = NULL;

  539. decrRefCount(o);

  540. return NULL;

  541. }

  542. // 有效,直接使用二进制字节流

  543. o->type = OBJ_ZSET;

  544. o->encoding = OBJ_ENCODING_ZIPLIST;

  545. if (zsetLength(o) > server.zset_max_ziplist_entries)

  546. // 超出压缩要求大小,转化成skiplist

  547. zsetConvert(o,OBJ_ENCODING_SKIPLIST);

  548. break;

  549. case RDB_TYPE_HASH_ZIPLIST:

  550. // hash 类型处理

  551. if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;

  552. // 检测是否有效

  553. if (!hashZiplistValidateIntegrity(encoded, encoded_len, deep_integrity_validation)) {

  554. rdbReportCorruptRDB("Hash ziplist integrity check failed.");

  555. zfree(encoded);

  556. o->ptr = NULL;

  557. decrRefCount(o);

  558. return NULL;

  559. }

  560. // 有效,直接使用二进制字节流

  561. o->type = OBJ_HASH;

  562. o->encoding = OBJ_ENCODING_ZIPLIST;

  563. if (hashTypeLength(o) > server.hash_max_ziplist_entries)

  564. // 超出压缩要求大小,转化成HT

  565. hashTypeConvert(o, OBJ_ENCODING_HT);

  566. break;

  567. default:

  568. /* totally unreachable */

  569. rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);

  570. break;

  571. }

  572. } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {

  573. // stream 类型处理

  574. // ...

  575. } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {

  576. // MODULE 类型处理

  577. // // ...

  578. } else {

  579. rdbReportReadError("Unknown RDB encoding type %d",rdbtype);

  580. return NULL;

  581. }

  582. return o;

  583. }

  584. /* Track loading progress in order to serve client's from time to time

  585. and if needed calculate rdb checksum */

  586. // rdb 加载进度回调, 设置在rio中,由rioh回调用

  587. void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {

  588. if (server.rdb_checksum)

  589. //更新rdb的checksum

  590. rioGenericUpdateChecksum(r, buf, len);

  591. if (server.loading_process_events_interval_bytes &&

  592. (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes)

  593. {

  594. // 连着主redis,并且处于接收rdb阶段

  595. if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)

  596. // 回复mastere 告诉它我们收到并加载rdb内容

  597. replicationSendNewlineToMaster();

  598. // 设置加载进度

  599. loadingProgress(r->processed_bytes);

  600. // 定时做一些实践处理(epoll_wait,处理客户端读写事件)

  601. processEventsWhileBlocked();

  602. processModuleLoadingProgressEvent(0);

  603. }

  604. }

  605. /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,

  606. * otherwise C_ERR is returned and 'errno' is set accordingly. */

  607. // 从rio(文件fd或者socket fd)中加载 rdb 到内存db中

  608. // 循环读取文件内容, 解析成redis kv等不同变量和指令,加载到内存中

  609. int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {

  610. uint64_t dbid;

  611. int type, rdbver;

  612. redisDb *db = server.db+0;

  613. char buf[1024];

  614. // 设置rio回调

  615. rdb->update_cksum = rdbLoadProgressCallback;

  616. rdb->max_processing_chunk = server.loading_process_events_interval_bytes;

  617. // 读取magic

  618. if (rioRead(rdb,buf,9) == 0) goto eoferr;

  619. buf[9] = '';

  620. if (memcmp(buf,"REDIS",5) != 0) {

  621. serverLog(LL_WARNING,"Wrong signature trying to load DB from file");

  622. errno = EINVAL;

  623. return C_ERR;

  624. }

  625. // 获取rdb 版本

  626. rdbver = atoi(buf+5);

  627. if (rdbver < 1 || rdbver > RDB_VERSION) {

  628. serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);

  629. errno = EINVAL;

  630. return C_ERR;

  631. }

  632. /* Key-specific attributes, set by opcodes before the key type. */

  633. long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();

  634. long long lru_clock = LRU_CLOCK();

  635. while(1) {

  636. sds key;

  637. robj *val;

  638. /* Read type. */

  639. // 读取一个字节的type

  640. if ((type = rdbLoadType(rdb)) == -1) goto eoferr;

  641. // 根据不同类型做不同处理

  642. /* Handle special types. */

  643. if (type == RDB_OPCODE_EXPIRETIME) {

  644. /* EXPIRETIME: load an expire associated with the next key

  645. * to load. Note that after loading an expire we need to

  646. * load the actual type, and continue. */

  647. // 超时类型,读取时间戳

  648. expiretime = rdbLoadTime(rdb);

  649. expiretime *= 1000;

  650. if (rioGetReadError(rdb)) goto eoferr;

  651. continue; /* Read next opcode. */

  652. } else if (type == RDB_OPCODE_EXPIRETIME_MS) {

  653. /* EXPIRETIME_MS: milliseconds precision expire times introduced

  654. * with RDB v3. Like EXPIRETIME but no with more precision. */

  655. // 超时类型,读取微秒时间戳

  656. expiretime = rdbLoadMillisecondTime(rdb,rdbver);

  657. if (rioGetReadError(rdb)) goto eoferr;

  658. continue; /* Read next opcode. */

  659. } else if (type == RDB_OPCODE_FREQ) {

  660. /* FREQ: LFU frequency. */

  661. // LFU 数值

  662. uint8_t byte;

  663. if (rioRead(rdb,&byte,1) == 0) goto eoferr;

  664. lfu_freq = byte;

  665. continue; /* Read next opcode. */

  666. } else if (type == RDB_OPCODE_IDLE) {

  667. /* IDLE: LRU idle time. */

  668. // LRU 时间

  669. uint64_t qword;

  670. // 读取时间戳

  671. if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;

  672. lru_idle = qword;

  673. continue; /* Read next opcode. */

  674. } else if (type == RDB_OPCODE_EOF) {

  675. // rdb 文件结束了

  676. /* EOF: End of file, exit the main loop. */

  677. break;

  678. } else if (type == RDB_OPCODE_SELECTDB) {

  679. // db序号, 读取并且设置当前db序号

  680. /* SELECTDB: Select the specified database. */

  681. if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;

  682. if (dbid >= (unsigned)server.dbnum) {

  683. serverLog(LL_WARNING,

  684. "FATAL: Data file was created with a Redis "

  685. "server configured to handle more than %d "

  686. "databases. Exitingn", server.dbnum);

  687. exit(1);

  688. }

  689. db = server.db+dbid;

  690. continue; /* Read next opcode. */

  691. } else if (type == RDB_OPCODE_RESIZEDB) {

  692. /* RESIZEDB: Hint about the size of the keys in the currently

  693. * selected data base, in order to avoid useless rehashing. */

  694. uint64_t db_size, expires_size;

  695. // 具体db 的dict和expires 大小

  696. if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)

  697. goto eoferr;

  698. if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)

  699. goto eoferr;

  700. dictExpand(db->dict,db_size);

  701. dictExpand(db->expires,expires_size);

  702. continue; /* Read next opcode. */

  703. } else if (type == RDB_OPCODE_AUX) {

  704. // 全局变量

  705. /* AUX: generic string-string fields. Use to add state to RDB

  706. * which is backward compatible. Implementations of RDB loading

  707. * are required to skip AUX fields they don't understand.

  708. *

  709. * An AUX field is composed of two strings: key and value. */

  710. robj *auxkey, *auxval;

  711. if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;

  712. if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;

  713. if (((char*)auxkey->ptr)[0] == '%') {

  714. /* All the fields with a name staring with '%' are considered

  715. * information fields and are logged at startup with a log

  716. * level of NOTICE. */

  717. serverLog(LL_NOTICE,"RDB '%s': %s",

  718. (char*)auxkey->ptr,

  719. (char*)auxval->ptr);

  720. } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {

  721. if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);

  722. } else if (!strcasecmp(auxkey->ptr,"repl-id")) {

  723. if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {

  724. memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);

  725. rsi->repl_id_is_set = 1;

  726. }

  727. } else if (!strcasecmp(auxkey->ptr,"repl-offset")) {

  728. if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);

  729. } else if (!strcasecmp(auxkey->ptr,"lua")) {

  730. /* Load the script back in memory. */

  731. if (luaCreateFunction(NULL,server.lua,auxval) == NULL) {

  732. rdbReportCorruptRDB(

  733. "Can't load Lua script from RDB file! "

  734. "BODY: %s", (char*)auxval->ptr);

  735. }

  736. } else if (!strcasecmp(auxkey->ptr,"redis-ver")) {

  737. serverLog(LL_NOTICE,"Loading RDB produced by version %s",

  738. (char*)auxval->ptr);

  739. } else if (!strcasecmp(auxkey->ptr,"ctime")) {

  740. time_t age = time(NULL)-strtol(auxval->ptr,NULL,10);

  741. if (age < 0) age = 0;

  742. serverLog(LL_NOTICE,"RDB age %ld seconds",

  743. (unsigned long) age);

  744. } else if (!strcasecmp(auxkey->ptr,"used-mem")) {

  745. long long usedmem = strtoll(auxval->ptr,NULL,10);

  746. serverLog(LL_NOTICE,"RDB memory usage when created %.2f Mb",

  747. (double) usedmem / (1024*1024));

  748. server.loading_rdb_used_mem = usedmem;

  749. } else if (!strcasecmp(auxkey->ptr,"aof-preamble")) {

  750. long long haspreamble = strtoll(auxval->ptr,NULL,10);

  751. if (haspreamble) serverLog(LL_NOTICE,"RDB has an AOF tail");

  752. } else if (!strcasecmp(auxkey->ptr,"redis-bits")) {

  753. /* Just ignored. */

  754. } else {

  755. /* We ignore fields we don't understand, as by AUX field

  756. * contract. */

  757. serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'",

  758. (char*)auxkey->ptr);

  759. }

  760. decrRefCount(auxkey);

  761. decrRefCount(auxval);

  762. continue; /* Read type again. */

  763. } else if (type == RDB_OPCODE_MODULE_AUX) {

  764. // MODULE相关变量

  765. /* Load module data that is not related to the Redis key space.

  766. * Such data can be potentially be stored both before and after the

  767. * RDB keys-values section. */

  768. uint64_t moduleid = rdbLoadLen(rdb,NULL);

  769. int when_opcode = rdbLoadLen(rdb,NULL);

  770. int when = rdbLoadLen(rdb,NULL);

  771. if (rioGetReadError(rdb)) goto eoferr;

  772. if (when_opcode != RDB_MODULE_OPCODE_UINT) {

  773. rdbReportReadError("bad when_opcode");

  774. goto eoferr;

  775. }

  776. moduleType *mt = moduleTypeLookupModuleByID(moduleid);

  777. char name[10];

  778. moduleTypeNameByID(name,moduleid);

  779. if (!rdbCheckMode && mt == NULL) {

  780. /* Unknown module. */

  781. serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);

  782. exit(1);

  783. } else if (!rdbCheckMode && mt != NULL) {

  784. if (!mt->aux_load) {

  785. /* Module doesn't support AUX. */

  786. serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);

  787. exit(1);

  788. }

  789. RedisModuleIO io;

  790. moduleInitIOContext(io,mt,rdb,NULL);

  791. io.ver = 2;

  792. /* Call the rdb_load method of the module providing the 10 bit

  793. * encoding version in the lower 10 bits of the module ID. */

  794. if (mt->aux_load(&io,moduleid&1023, when) != REDISMODULE_OK || io.error) {

  795. moduleTypeNameByID(name,moduleid);

  796. serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);

  797. goto eoferr;

  798. }

  799. if (io.ctx) {

  800. moduleFreeContext(io.ctx);

  801. zfree(io.ctx);

  802. }

  803. uint64_t eof = rdbLoadLen(rdb,NULL);

  804. if (eof != RDB_MODULE_OPCODE_EOF) {

  805. serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);

  806. goto eoferr;

  807. }

  808. continue;

  809. } else {

  810. /* RDB check mode. */

  811. robj *aux = rdbLoadCheckModuleValue(rdb,name);

  812. decrRefCount(aux);

  813. continue; /* Read next opcode. */

  814. }

  815. }

  816. // 来到这里就是读取 kv

  817. /* Read key */

  818. if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)

  819. goto eoferr;

  820. /* Read value */

  821. // 加载value, 次数type是具体的value 类型

  822. if ((val = rdbLoadObject(type,rdb,key)) == NULL) {

  823. sdsfree(key);

  824. goto eoferr;

  825. }

  826. /* Check if the key already expired. This function is used when loading

  827. * an RDB file from disk, either at startup, or when an RDB was

  828. * received from the master. In the latter case, the master is

  829. * responsible for key expiry. If we would expire keys here, the

  830. * snapshot taken by the master may not be reflected on the slave.

  831. * Similarly if the RDB is the preamble of an AOF file, we want to

  832. * load all the keys as they are, since the log of operations later

  833. * assume to work in an exact keyspace state. */

  834. if (iAmMaster() &&

  835. !(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&

  836. expiretime != -1 && expiretime < now)

  837. {

  838. // 本身是主,并且不是aof混合模式, 过期时间小于现在(已经过期,直接删掉)

  839. sdsfree(key);

  840. decrRefCount(val);

  841. } else {

  842. // 把key val 条件到对应db中

  843. robj keyobj;

  844. initStaticStringObject(keyobj,key);

  845. /* Add the new object in the hash table */

  846. int added = dbAddRDBLoad(db,key,val);

  847. if (!added) {

  848. // 现在db已经有了,不合理,看是否能够容忍

  849. if (rdbflags & RDBFLAGS_ALLOW_DUP) {

  850. /* This flag is useful for DEBUG RELOAD special modes.

  851. * When it's set we allow new keys to replace the current

  852. * keys with the same name. */

  853. // 异步删除

  854. dbSyncDelete(db,&keyobj);

  855. // 再设置

  856. dbAddRDBLoad(db,key,val);

  857. } else {

  858. // 不能容忍直接报错

  859. serverLog(LL_WARNING,

  860. "RDB has duplicated key '%s' in DB %d",key,db->id);

  861. serverPanic("Duplicated key found in RDB file");

  862. }

  863. }

  864. // 设置过期时间

  865. /* Set the expire time if needed */

  866. if (expiretime != -1) {

  867. setExpire(NULL,db,&keyobj,expiretime);

  868. }

  869. // 设置LRU 和LFU相关链表

  870. /* Set usage information (for eviction). */

  871. objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);

  872. // 产生 module event

  873. /* call key space notification on key loaded for modules only */

  874. moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id);

  875. }

  876. /* Loading the database more slowly is useful in order to test

  877. * certain edge cases. */

  878. if (server.key_load_delay)

  879. debugDelay(server.key_load_delay);

  880. /* Reset the state that is key-specified and is populated by

  881. * opcodes before the key, so that we start from scratch again. */

  882. expiretime = -1;

  883. lfu_freq = -1;

  884. lru_idle = -1;

  885. }

  886. // rdb 版本> 5, 校验checksum

  887. /* Verify the checksum if RDB version is >= 5 */

  888. if (rdbver >= 5) {

  889. uint64_t cksum, expected = rdb->cksum;

  890. if (rioRead(rdb,&cksum,8) == 0) goto eoferr;

  891. if (server.rdb_checksum && !server.skip_checksum_validation) {

  892. memrev64ifbe(&cksum);

  893. if (cksum == 0) {

  894. serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");

  895. } else if (cksum != expected) {

  896. serverLog(LL_WARNING,"Wrong RDB checksum expected: (%llx) but "

  897. "got (%llx). Aborting now.",

  898. (unsigned long long)expected,

  899. (unsigned long long)cksum);

  900. rdbReportCorruptRDB("RDB CRC error");

  901. return C_ERR;

  902. }

  903. }

  904. }

  905. return C_OK;

  906. /* Unexpected end of file is handled here calling rdbReportReadError():

  907. * this will in turn either abort Redis in most cases, or if we are loading

  908. * the RDB file from a socket during initial SYNC (diskless replica mode),

  909. * we'll report the error to the caller, so that we can retry. */

  910. eoferr:

  911. serverLog(LL_WARNING,

  912. "Short read or OOM loading DB. Unrecoverable error, aborting now.");

  913. rdbReportReadError("Unexpected EOF reading RDB file");

  914. return C_ERR;

  915. }

  916. /* Like rdbLoadRio() but takes a filename instead of a rio stream. The

  917. * filename is open for reading and a rio stream object created in order

  918. * to do the actual loading. Moreover the ETA displayed in the INFO

  919. * output is initialized and finalized.

  920. *

  921. * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the

  922. * loading code will fiil the information fields in the structure. */

  923. // wrap rdbLoadRio()

  924. // 从一个文件加载内容

  925. int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {

  926. FILE *fp;

  927. rio rdb;

  928. int retval;

  929. // 打开文件

  930. if ((fp = fopen(filename,"r")) == NULL) return C_ERR;

  931. // 设置开始加载的全局状态

  932. startLoadingFile(fp, filename,rdbflags);

  933. // 用文件fd 初始化rio

  934. rioInitWithFile(&rdb,fp);

  935. // 调用rdbLoadRio加载

  936. retval = rdbLoadRio(&rdb,rdbflags,rsi);

  937. fclose(fp);

  938. stopLoading(retval==C_OK);

  939. return retval;

  940. }

  941. // path: src/server.c

  942. /* Function called at startup to load RDB or AOF file in memory. */

  943. // InitServerLast(); 之后被调用

  944. void loadDataFromDisk(void) {

  945. long long start = ustime();

  946. if (server.aof_state == AOF_ON) {

  947. // 开启AOF时, 加载AOF文件

  948. if (loadAppendOnlyFile(server.aof_filename) == C_OK)

  949. serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);

  950. } else {

  951. // 加载rdb

  952. rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;

  953. errno = 0; /* Prevent a stale value from affecting error checking */

  954. // 从rdb 加载 内容

  955. if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {

  956. // 加载成功

  957. // 打印加载耗时

  958. serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",

  959. (float)(ustime()-start)/1000000);

  960. /* Restore the replication ID / offset from the RDB file. */

  961. // 是主节点

  962. if ((server.masterhost ||

  963. (server.cluster_enabled &&

  964. nodeIsSlave(server.cluster->myself))) &&

  965. rsi.repl_id_is_set &&

  966. rsi.repl_offset != -1 &&

  967. /* Note that older implementations may save a repl_stream_db

  968. * of -1 inside the RDB file in a wrong way, see more

  969. * information in function rdbPopulateSaveInfo. */

  970. rsi.repl_stream_db != -1)

  971. {

  972. // 保存自己的replid

  973. memcpy(server.replid,rsi.repl_id,sizeof(server.replid));

  974. server.master_repl_offset = rsi.repl_offset;

  975. /* If we are a slave, create a cached master from this

  976. * information, in order to allow partial resynchronizations

  977. * with masters. */

  978. replicationCacheMasterUsingMyself();

  979. // 切换到rdb 最后操作的db序号

  980. selectDb(server.cached_master,rsi.repl_stream_db);

  981. }

  982. } else if (errno != ENOENT) {

  983. // rdb文件存在但是加载出错, 报错退出

  984. serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));

  985. exit(1);

  986. }

  987. // 不存在就不加载,正常启动

  988. }

  989. }

  990. int main(int argc, char **argv) {

  991. // ...

  992. ACLLoadUsersAtStartup();

  993. InitServerLast();

  994. // 从磁盘加载AOF或者RDB

  995. loadDataFromDisk();

  996. // ...

  997. }

启动加载RDB的调用链:

  1. int main(int argc, char **argv)

  2. - loadDataFromDisk(); // 从磁盘加载AOF或者RDB

  3. - int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) //从 文件加载

  4. // 从rio(文件fd或者socket fd)中加载 rdb 到内存db中

  5. - int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi)

  6. // 循环读取文件内容, 解析成redis kv等不同变量和指令,加载到内存中

  7. - while(1)

  8. - int rdbLoadType(rio *rdb) // 读取kv 的value类型

  9. - key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))、 /* Read key */

  10. - val = rdbLoadObject(type,rdb,key)) // 加载value, 次数type是具体的value 类型

  11. - // 判断RDB_Type, 根据不同type加载到内存中


定时启动子进程保存rdb

启动子进程生成rdb
  1. //path: src/rdb.c

  2. /* Save a Redis object.

  3. * Returns -1 on error, number of bytes written on success. */

  4. // rdb保存一个具体的Redis object

  5. ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {

  6. ssize_t n = 0, nwritten = 0;

  7. if (o->type == OBJ_STRING) {

  8. /* Save a string value */

  9. // string就保存string

  10. if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;

  11. nwritten += n;

  12. } else if (o->type == OBJ_LIST) {

  13. /* Save a list value */

  14. // 链表,先写入长度

  15. // 再依次写入具体节点

  16. if (o->encoding == OBJ_ENCODING_QUICKLIST) {

  17. quicklist *ql = o->ptr;

  18. quicklistNode *node = ql->head;

  19. if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;

  20. nwritten += n;

  21. while(node) {

  22. // 判断是否是压缩类型, 调用不同string写入方案

  23. if (quicklistNodeIsCompressed(node)) {

  24. void *data;

  25. size_t compress_len = quicklistGetLzf(node, &data);

  26. if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;

  27. nwritten += n;

  28. } else {

  29. if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;

  30. nwritten += n;

  31. }

  32. node = node->next;

  33. }

  34. } else {

  35. serverPanic("Unknown list encoding");

  36. }

  37. } else if (o->type == OBJ_SET) {

  38. /* Save a set value */

  39. if (o->encoding == OBJ_ENCODING_HT) {

  40. dict *set = o->ptr;

  41. dictIterator *di = dictGetIterator(set);

  42. dictEntry *de;

  43. // 先写set长度

  44. if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {

  45. dictReleaseIterator(di);

  46. return -1;

  47. }

  48. nwritten += n;

  49. // 遍历set

  50. while((de = dictNext(di)) != NULL) {

  51. sds ele = dictGetKey(de);

  52. // 写元素string

  53. if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))

  54. == -1)

  55. {

  56. dictReleaseIterator(di);

  57. return -1;

  58. }

  59. nwritten += n;

  60. }

  61. dictReleaseIterator(di);

  62. } else if (o->encoding == OBJ_ENCODING_INTSET) {

  63. size_t l = intsetBlobLen((intset*)o->ptr);

  64. // 压缩的INTSET, 直接写进去

  65. if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;

  66. nwritten += n;

  67. } else {

  68. serverPanic("Unknown set encoding");

  69. }

  70. } else if (o->type == OBJ_ZSET) {

  71. /* Save a sorted set value */

  72. if (o->encoding == OBJ_ENCODING_ZIPLIST) {

  73. size_t l = ziplistBlobLen((unsigned char*)o->ptr);

  74. // 压缩的类型,直接写进去

  75. if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;

  76. nwritten += n;

  77. } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {

  78. zset *zs = o->ptr;

  79. zskiplist *zsl = zs->zsl;

  80. // 先写数量

  81. if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;

  82. nwritten += n;

  83. /* We save the skiplist elements from the greatest to the smallest

  84. * (that's trivial since the elements are already ordered in the

  85. * skiplist): this improves the load process, since the next loaded

  86. * element will always be the smaller, so adding to the skiplist

  87. * will always immediately stop at the head, making the insertion

  88. * O(1) instead of O(log(N)). */

  89. zskiplistNode *zn = zsl->tail;

  90. // 逐个key val遍历

  91. while (zn != NULL) {

  92. // 保存key

  93. if ((n = rdbSaveRawString(rdb,

  94. (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)

  95. {

  96. return -1;

  97. }

  98. nwritten += n;

  99. // 保存value

  100. if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)

  101. return -1;

  102. nwritten += n;

  103. zn = zn->backward;

  104. }

  105. } else {

  106. serverPanic("Unknown sorted set encoding");

  107. }

  108. } else if (o->type == OBJ_HASH) {

  109. /* Save a hash value */

  110. if (o->encoding == OBJ_ENCODING_ZIPLIST) {

  111. // 压缩类型直接写字节流

  112. size_t l = ziplistBlobLen((unsigned char*)o->ptr);

  113. if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;

  114. nwritten += n;

  115. } else if (o->encoding == OBJ_ENCODING_HT) {

  116. dictIterator *di = dictGetIterator(o->ptr);

  117. dictEntry *de;

  118. // 先写元素数量

  119. if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {

  120. dictReleaseIterator(di);

  121. return -1;

  122. }

  123. nwritten += n;

  124. // 遍历元素

  125. while((de = dictNext(di)) != NULL) {

  126. sds field = dictGetKey(de);

  127. sds value = dictGetVal(de);

  128. // 写元素

  129. if ((n = rdbSaveRawString(rdb,(unsigned char*)field,

  130. sdslen(field))) == -1)

  131. {

  132. dictReleaseIterator(di);

  133. return -1;

  134. }

  135. nwritten += n;

  136. if ((n = rdbSaveRawString(rdb,(unsigned char*)value,

  137. sdslen(value))) == -1)

  138. {

  139. dictReleaseIterator(di);

  140. return -1;

  141. }

  142. nwritten += n;

  143. }

  144. dictReleaseIterator(di);

  145. } else {

  146. serverPanic("Unknown hash encoding");

  147. }

  148. } else if (o->type == OBJ_STREAM) {

  149. // stream 类型

  150. // ...

  151. }else if (o->type == OBJ_MODULE) {

  152. // ...

  153. }else {

  154. serverPanic("Unknown object type");

  155. }

  156. return nwritten;

  157. }

  158. /* Save a key-value pair, with expire time, type, key, value.

  159. * On error -1 is returned.

  160. * On success if the key was actually saved 1 is returned. */

  161. // 保存k,v, 并且保持过期时间

  162. int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {

  163. int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;

  164. int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

  165. /* Save the expire time */

  166. if (expiretime != -1) {

  167. // 过期时间不为0,先写一个过期的opcode

  168. // 再把过期时间写进去

  169. if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;

  170. if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;

  171. }

  172. /* Save the LRU info. */

  173. // 保存LRU时,把object分配时间也写进去

  174. if (savelru) {

  175. uint64_t idletime = estimateObjectIdleTime(val);

  176. idletime /= 1000; /* Using seconds is enough and requires less space.*/

  177. if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;

  178. if (rdbSaveLen(rdb,idletime) == -1) return -1;

  179. }

  180. /* Save the LFU info. */

  181. // 保存LFU时,把object访问次数也写进去

  182. if (savelfu) {

  183. uint8_t buf[1];

  184. buf[0] = LFUDecrAndReturn(val);

  185. /* We can encode this in exactly two bytes: the opcode and an 8

  186. * bit counter, since the frequency is logarithmic with a 0-255 range.

  187. * Note that we do not store the halving time because to reset it

  188. * a single time when loading does not affect the frequency much. */

  189. if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;

  190. if (rdbWriteRaw(rdb,buf,1) == -1) return -1;

  191. }

  192. /* Save type, key, value */

  193. // 写val类型比如RDB_TYPE_STRING, RDB_TYPE_ZSET_2

  194. if (rdbSaveObjectType(rdb,val) == -1) return -1;

  195. // 将key写进去(key都是string)

  196. if (rdbSaveStringObject(rdb,key) == -1) return -1;

  197. // 将val object 写到rdb

  198. if (rdbSaveObject(rdb,val,key) == -1) return -1;

  199. /* Delay return if required (for testing) */

  200. if (server.rdb_key_save_delay)

  201. debugDelay(server.rdb_key_save_delay);

  202. return 1;

  203. }

  204. /* Produces a dump of the database in RDB format sending it to the specified

  205. * Redis I/O channel. On success C_OK is returned, otherwise C_ERR

  206. * is returned and part of the output, or all the output, can be

  207. * missing because of I/O errors.

  208. *

  209. * When the function returns C_ERR and if 'error' is not NULL, the

  210. * integer pointed by 'error' is set to the value of errno just after the I/O

  211. * error. */

  212. // 生成rdb 文件

  213. // 首先写magic

  214. // 写全局的变量信息

  215. // 逐个扫描db,写当前db 序号; 扫描db 的所有kv, 写kv

  216. // 最后写eof 和 cr64 checksum

  217. int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {

  218. dictIterator *di = NULL;

  219. dictEntry *de;

  220. char magic[10];

  221. uint64_t cksum;

  222. size_t processed = 0;

  223. int j;

  224. long key_count = 0;

  225. long long info_updated_time = 0;

  226. char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";

  227. if (server.rdb_checksum)

  228. rdb->update_cksum = rioGenericUpdateChecksum;

  229. snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);

  230. if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;

  231. if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;

  232. if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

  233. // 逐个db保存k v

  234. for (j = 0; j < server.dbnum; j++) {

  235. redisDb *db = server.db+j;

  236. dict *d = db->dict;

  237. if (dictSize(d) == 0) continue;

  238. di = dictGetSafeIterator(d);

  239. /* Write the SELECT DB opcode */

  240. // 保存目前db序号

  241. if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;

  242. if (rdbSaveLen(rdb,j) == -1) goto werr;

  243. /* Write the RESIZE DB opcode. */

  244. uint64_t db_size, expires_size;

  245. db_size = dictSize(db->dict);

  246. expires_size = dictSize(db->expires);

  247. // 保存当前db 的dict大小 和expires大小

  248. if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;

  249. if (rdbSaveLen(rdb,db_size) == -1) goto werr;

  250. if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

  251. /* Iterate this DB writing every entry */

  252. // 遍历db 的kv,逐个保存

  253. while((de = dictNext(di)) != NULL) {

  254. sds keystr = dictGetKey(de);

  255. robj key, *o = dictGetVal(de);

  256. long long expire;

  257. initStaticStringObject(key,keystr);

  258. expire = getExpire(db,&key);

  259. if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;

  260. /* When this RDB is produced as part of an AOF rewrite, move

  261. * accumulated diff from parent to child while rewriting in

  262. * order to have a smaller final write. */

  263. // rdb 和aof混合模式,及时读取父进程的rewrite buf

  264. if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&

  265. rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)

  266. {

  267. processed = rdb->processed_bytes;

  268. aofReadDiffFromParent();

  269. }

  270. /* Update child info every 1 second (approximately).

  271. * in order to avoid calling mstime() on each iteration, we will

  272. * check the diff every 1024 keys */

  273. // 定时向父进程发送子进程进度信息

  274. if ((key_count++ & 1023) == 0) {

  275. long long now = mstime();

  276. if (now - info_updated_time >= 1000) {

  277. sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname);

  278. info_updated_time = now;

  279. }

  280. }

  281. }

  282. dictReleaseIterator(di);

  283. di = NULL; /* So that we don't release it again on error. */

  284. }

  285. /* If we are storing the replication information on disk, persist

  286. * the script cache as well: on successful PSYNC after a restart, we need

  287. * to be able to process any EVALSHA inside the replication backlog the

  288. * master will send us. */

  289. if (rsi && dictSize(server.lua_scripts)) {

  290. di = dictGetIterator(server.lua_scripts);

  291. while((de = dictNext(di)) != NULL) {

  292. robj *body = dictGetVal(de);

  293. if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)

  294. goto werr;

  295. }

  296. dictReleaseIterator(di);

  297. di = NULL; /* So that we don't release it again on error. */

  298. }

  299. // 保存一些其他全局信息

  300. if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

  301. /* EOF opcode */

  302. // 写eof 代表rdb文件末尾

  303. if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

  304. /* CRC64 checksum. It will be zero if checksum computation is disabled, the

  305. * loading code skips the check in this case. */

  306. cksum = rdb->cksum;

  307. memrev64ifbe(&cksum);

  308. // 写CRC64 checksum,用于校验

  309. if (rioWrite(rdb,&cksum,8) == 0) goto werr;

  310. return C_OK;

  311. werr:

  312. if (error) *error = errno;

  313. if (di) dictReleaseIterator(di);

  314. return C_ERR;

  315. }

  316. /* Save the DB on disk. Return C_ERR on error, C_OK on success. */

  317. // 功能:生成rdb 并且保存在文件 filename

  318. // 首先创建一个临时文件, 调用rioInitWithFile初始化rio

  319. // 再调用rdbSaveRio生成rdb(往rio写内容),实际上写到临时文件

  320. // 最后将临时文件重命名为filename

  321. int rdbSave(char *filename, rdbSaveInfo *rsi) {

  322. char tmpfile[256];

  323. char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */

  324. FILE *fp = NULL;

  325. rio rdb;

  326. int error = 0;

  327. // 创建一个临时文件

  328. snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());

  329. fp = fopen(tmpfile,"w");

  330. if (!fp) {

  331. char *cwdp = getcwd(cwd,MAXPATHLEN);

  332. serverLog(LL_WARNING,

  333. "Failed opening the RDB file %s (in server root dir %s) "

  334. "for saving: %s",

  335. filename,

  336. cwdp ? cwdp : "unknown",

  337. strerror(errno));

  338. return C_ERR;

  339. }

  340. // 调用rioInitWithFile初始化rio

  341. rioInitWithFile(&rdb,fp);

  342. startSaving(RDBFLAGS_NONE);

  343. if (server.rdb_save_incremental_fsync)

  344. rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

  345. // 再调用rdbSaveRio生成rdb(往rio写内容),实际上写到临时文件

  346. if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {

  347. errno = error;

  348. goto werr;

  349. }

  350. // 刷盘

  351. /* Make sure data will not remain on the OS's output buffers */

  352. if (fflush(fp)) goto werr;

  353. if (fsync(fileno(fp))) goto werr;

  354. if (fclose(fp)) { fp = NULL; goto werr; }

  355. fp = NULL;

  356. /* Use RENAME to make sure the DB file is changed atomically only

  357. * if the generate DB file is ok. */

  358. // 最后将临时文件重命名为filename

  359. if (rename(tmpfile,filename) == -1) {

  360. char *cwdp = getcwd(cwd,MAXPATHLEN);

  361. serverLog(LL_WARNING,

  362. "Error moving temp DB file %s on the final "

  363. "destination %s (in server root dir %s): %s",

  364. tmpfile,

  365. filename,

  366. cwdp ? cwdp : "unknown",

  367. strerror(errno));

  368. unlink(tmpfile);

  369. stopSaving(0);

  370. return C_ERR;

  371. }

  372. serverLog(LL_NOTICE,"DB saved on disk");

  373. server.dirty = 0;

  374. server.lastsave = time(NULL);

  375. server.lastbgsave_status = C_OK;

  376. stopSaving(1);

  377. return C_OK;

  378. werr:

  379. serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));

  380. if (fp) fclose(fp);

  381. unlink(tmpfile);

  382. stopSaving(0);

  383. return C_ERR;

  384. }

  385. // 开一个后台进程生成rdb,并且保持到文件中

  386. int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {

  387. pid_t childpid;

  388. // 判断已经有子进程, 返回错误

  389. if (hasActiveChildProcess()) return C_ERR;

  390. server.dirty_before_bgsave = server.dirty;

  391. server.lastbgsave_try = time(NULL);

  392. // fork 一个子进程

  393. if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {

  394. int retval;

  395. /* Child */

  396. // 设置进程名字 cpu亲和性

  397. redisSetProcTitle("redis-rdb-bgsave");

  398. redisSetCpuAffinity(server.bgsave_cpulist);

  399. // 调用rdbSave 生成rdb

  400. retval = rdbSave(filename,rsi);

  401. if (retval == C_OK) {

  402. // 通知父进程rdb已经完成

  403. sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");

  404. }

  405. exitFromChild((retval == C_OK) ? 0 : 1);

  406. } else {

  407. /* Parent */

  408. if (childpid == -1) {

  409. server.lastbgsave_status = C_ERR;

  410. serverLog(LL_WARNING,"Can't save in background: fork: %s",

  411. strerror(errno));

  412. return C_ERR;

  413. }

  414. serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid);

  415. server.rdb_save_time_start = time(NULL);

  416. server.rdb_child_type = RDB_CHILD_TYPE_DISK;

  417. return C_OK;

  418. }

  419. return C_OK; /* unreached */

  420. }

  421. // path:src/server.c

  422. int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

  423. // ...

  424. /* Check if a background saving or AOF rewrite in progress terminated. */

  425. if (hasActiveChildProcess() || ldbPendingChildren())

  426. {

  427. run_with_period(1000) receiveChildInfo();

  428. checkChildrenDone();

  429. } else {

  430. /* If there is not a background saving/rewrite in progress check if

  431. * we have to save/rewrite now. */

  432. // 遍历rdb 保存(生成)策略

  433. for (j = 0; j < server.saveparamslen; j++) {

  434. struct saveparam *sp = server.saveparams+j;

  435. /* Save if we reached the given amount of changes,

  436. * the given amount of seconds, and if the latest bgsave was

  437. * successful or if, in case of an error, at least

  438. * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */

  439. // 达到时间要求

  440. // change数量要求

  441. // 并且(上次bg save正常 或者 已经有CONFIG_BGSAVE_RETRY_DELAY的时间没有bgsave)

  442. if (server.dirty >= sp->changes &&

  443. server.unixtime-server.lastsave > sp->seconds &&

  444. (server.unixtime-server.lastbgsave_try >

  445. CONFIG_BGSAVE_RETRY_DELAY ||

  446. server.lastbgsave_status == C_OK))

  447. {

  448. serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",

  449. sp->changes, (int)sp->seconds);

  450. rdbSaveInfo rsi, *rsiptr;

  451. rsiptr = rdbPopulateSaveInfo(&rsi);

  452. // 开启bg save

  453. rdbSaveBackground(server.rdb_filename,rsiptr);

  454. break;

  455. }

  456. }

  457. // ...

  458. }

  459. // ...

  460. }

启动子进程保存rdb文件的调用链为:

  1. int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) // cron,定时执行

  2. -int rdbSaveBackground(char *filename, rdbSaveInfo *rsi)// 没有其他子进程,变量bg save策略,如果达到要求,开启bg save

  3. - int redisFork(int purpose) //fork子进程

  4. - int rdbSave(char *filename, rdbSaveInfo *rsi) //子进程调用此生成rdb 并且保存在文件 filename

  5. - fp = fopen(tmpfile,"w");// 创建一个临时文件,获得句柄

  6. - rioInitWithFile(&rdb,fp);// 使用fd初始化rio

  7. - int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi)// 生成rdb 文件

  8. - // 首先写magic

  9. - // 写全局的变量信息

  10. - for (j = 0; j < server.dbnum; j++)// 逐个扫描db,

  11. - rdbSaveType(rdb,RDB_OPCODE_SELECTDB) //写当前db 序号;

  12. - while(***) //扫描db 的所有kv, 写kv

  13. - int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,long long expiretime)// 保存k&v,并且保持过期时间

  14. - rdbSaveObjectType(rdb,val) // 写val类型比如RDB_TYPE_STRING, RDB_TYPE_ZSET_2

  15. - rdbSaveStringObject(rdb,key) // 将key写进去(key都是string)

  16. - rdbSaveObject(rdb,val,key) // 将val object 写到rdb

  17. - rdb保存一个具体的Redis object

  18. - fflush(fp) // 到这里rdb生成完了,刷盘

  19. - rename(tmpfile,filename) // 最后将临时文件重命名为filename

  20. - exitFromChild((retval == C_OK) ? 0 : 1);// 退出子进程

子进程处理完毕后主进程处理
  1. //path: src/rdb.c

  2. /* A background saving child (BGSAVE) terminated its work. Handle this.

  3. * This function covers the case of actual BGSAVEs. */

  4. // bg save 保存完rdb 后主线程调用handle处理

  5. static void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {

  6. if (!bysignal && exitcode == 0) {

  7. // 保存rdb 成功

  8. serverLog(LL_NOTICE,

  9. "Background saving terminated with success");

  10. // rdb子进程开启后产生的change数量

  11. server.dirty = server.dirty - server.dirty_before_bgsave;

  12. // 记录最后保存时间

  13. server.lastsave = time(NULL);

  14. server.lastbgsave_status = C_OK;

  15. } else if (!bysignal && exitcode != 0) {

  16. // 保存失败

  17. serverLog(LL_WARNING, "Background saving error");

  18. server.lastbgsave_status = C_ERR;

  19. } else {

  20. mstime_t latency;

  21. // 主动被信号kill

  22. serverLog(LL_WARNING,

  23. "Background saving terminated by signal %d", bysignal);

  24. latencyStartMonitor(latency);

  25. // 删除rdb临时文件

  26. rdbRemoveTempFile(server.child_pid, 0);

  27. latencyEndMonitor(latency);

  28. latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);

  29. /* SIGUSR1 is whitelisted, so we have a way to kill a child without

  30. * triggering an error condition. */

  31. if (bysignal != SIGUSR1)

  32. server.lastbgsave_status = C_ERR;

  33. }

  34. }

  35. /* A background saving child (BGSAVE) terminated its work. Handle this.

  36. * This function covers the case of RDB -> Slaves socket transfers for

  37. * diskless replication. */

  38. // bgsave保存rdb发送给副本执行完成后, 主线程会执行这个handle

  39. static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {

  40. if (!bysignal && exitcode == 0) {

  41. // 成功

  42. serverLog(LL_NOTICE,

  43. "Background RDB transfer terminated with success");

  44. } else if (!bysignal && exitcode != 0) {

  45. // 异常

  46. serverLog(LL_WARNING, "Background transfer error");

  47. } else {

  48. // 被信号kill

  49. serverLog(LL_WARNING,

  50. "Background transfer terminated by signal %d", bysignal);

  51. }

  52. // 关闭通讯句柄

  53. if (server.rdb_child_exit_pipe!=-1)

  54. close(server.rdb_child_exit_pipe);

  55. // 删除子进程给父(主进程)发送rdb的fd

  56. aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);

  57. close(server.rdb_pipe_read);

  58. server.rdb_child_exit_pipe = -1;

  59. server.rdb_pipe_read = -1;

  60. zfree(server.rdb_pipe_conns);

  61. server.rdb_pipe_conns = NULL;

  62. server.rdb_pipe_numconns = 0;

  63. server.rdb_pipe_numconns_writing = 0;

  64. //释放缓冲器: 从子进程读取rdb内容缓冲区

  65. zfree(server.rdb_pipe_buff);

  66. server.rdb_pipe_buff = NULL;

  67. server.rdb_pipe_bufflen = 0;

  68. }

  69. /* When a background RDB saving/transfer terminates, call the right handler. */

  70. // bg rdb 子进程完成后,会调用这个handle

  71. // handle在根据具体的类型: 存盘, socket调用不同的handle处理

  72. void backgroundSaveDoneHandler(int exitcode, int bysignal) {

  73. int type = server.rdb_child_type;

  74. switch(server.rdb_child_type) {

  75. // rdb 保存在磁盘文件的子进程结束

  76. // 本节聚焦于 RDB_CHILD_TYPE_DISK的处理

  77. case RDB_CHILD_TYPE_DISK:

  78. backgroundSaveDoneHandlerDisk(exitcode,bysignal);

  79. break;

  80. case RDB_CHILD_TYPE_SOCKET:

  81. backgroundSaveDoneHandlerSocket(exitcode,bysignal);

  82. break;

  83. default:

  84. serverPanic("Unknown RDB child type.");

  85. break;

  86. }

  87. server.rdb_child_type = RDB_CHILD_TYPE_NONE;

  88. server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;

  89. server.rdb_save_time_start = -1;

  90. /* Possibly there are slaves waiting for a BGSAVE in order to be served

  91. * (the first stage of SYNC is a bulk transfer of dump.rdb) */

  92. // 处理接收rdb 的副本

  93. // 如果是diskless, 子进程运行完毕也说明发送完毕

  94. // 非diskless,监听副本tcp fd写事件发送 rdb

  95. updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, type);

  96. }

  97. // 判断子进程是否退出并做处理

  98. void checkChildrenDone(void) {

  99. int statloc = 0;

  100. pid_t pid;

  101. // 调用 waitpid 获取子进程状态

  102. if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {

  103. int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;

  104. int bysignal = 0;

  105. if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

  106. /* sigKillChildHandler catches the signal and calls exit(), but we

  107. * must make sure not to flag lastbgsave_status, etc incorrectly.

  108. * We could directly terminate the child process via SIGUSR1

  109. * without handling it */

  110. if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {

  111. bysignal = SIGUSR1;

  112. exitcode = 1;

  113. }

  114. if (pid == -1) {

  115. serverLog(LL_WARNING,"waitpid() returned an error: %s. "

  116. "child_type: %s, child_pid = %d",

  117. strerror(errno),

  118. strChildType(server.child_type),

  119. (int) server.child_pid);

  120. } else if (pid == server.child_pid) {

  121. if (server.child_type == CHILD_TYPE_RDB) {

  122. // rdb 子进程退出处理

  123. backgroundSaveDoneHandler(exitcode, bysignal);

  124. } else if (server.child_type == CHILD_TYPE_AOF) {

  125. // 收到aof rewrite子进程退出

  126. // 执行aof rewrite后主进程处理

  127. backgroundRewriteDoneHandler(exitcode, bysignal);

  128. } else if (server.child_type == CHILD_TYPE_MODULE) {

  129. ModuleForkDoneHandler(exitcode, bysignal);

  130. } else {

  131. serverPanic("Unknown child type %d for child pid %d", server.child_type, server.child_pid);

  132. exit(1);

  133. }

  134. if (!bysignal && exitcode == 0) receiveChildInfo();

  135. resetChildState();

  136. } else {

  137. if (!ldbRemoveChild(pid)) {

  138. serverLog(LL_WARNING,

  139. "Warning, detected child with unmatched pid: %ld",

  140. (long) pid);

  141. }

  142. }

  143. /* start any pending forks immediately. */

  144. // 处理副本, 告诉他们rdb发送完毕(RDB_CHILD_TYPE_SOCKET)

  145. // 或者开始发送rdb(RDB_CHILD_TYPE_DISK)

  146. replicationStartPendingFork();

  147. }

  148. }

  149. // path:src/server.c

  150. int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

  151. // ...

  152. /* Check if a background saving or AOF rewrite in progress terminated. */

  153. if (hasActiveChildProcess() || ldbPendingChildren())

  154. {

  155. run_with_period(1000) receiveChildInfo();

  156. checkChildrenDone();

  157. } else {

  158. //...

  159. }

  160. // ...

  161. }

父进程处理bg save(文件)进程结束的调用链为:

  1. int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) //cron 定时执行

  2. - void checkChildrenDone(void) // 判断子进程是否退出并做处理

  3. - void backgroundSaveDoneHandler(int exitcode, int bysignal) // bg rdb 子进程完成后,会调用这个handle

  4. - void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) // bg save(disk) 保存完rdb 后主线程调用handle处理

  5. - server.dirty = server.dirty - server.dirty_before_bgsave; // rdb子进程开启后产生的change数量

  6. - server.lastsave = time(NULL); // 记录最后保存时间


save命令和bgsave命令

  1. // path: src/rdb.c

  2. // save命令,即同步执行保存rdb文件

  3. // 阻塞主进程

  4. void saveCommand(client *c) {

  5. if (server.child_type == CHILD_TYPE_RDB) {

  6. addReplyError(c,"Background save already in progress");

  7. return;

  8. }

  9. rdbSaveInfo rsi, *rsiptr;

  10. rsiptr = rdbPopulateSaveInfo(&rsi);

  11. // 直接调用rdbSave同步保存

  12. if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {

  13. addReply(c,shared.ok);

  14. } else {

  15. addReplyErrorObject(c,shared.err);

  16. }

  17. }

  18. /* BGSAVE [SCHEDULE] */

  19. // save命令,即同步执行保存rdb文件

  20. // 不阻塞主进程(fork子进程处理)

  21. void bgsaveCommand(client *c) {

  22. int schedule = 0;

  23. /* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite

  24. * is in progress. Instead of returning an error a BGSAVE gets scheduled. */

  25. if (c->argc > 1) {

  26. if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {

  27. schedule = 1;

  28. } else {

  29. addReplyErrorObject(c,shared.syntaxerr);

  30. return;

  31. }

  32. }

  33. rdbSaveInfo rsi, *rsiptr;

  34. rsiptr = rdbPopulateSaveInfo(&rsi);

  35. // 已经有偶rdb子进程了,报错

  36. if (server.child_type == CHILD_TYPE_RDB) {

  37. addReplyError(c,"Background save already in progress");

  38. } else if (hasActiveChildProcess()) {

  39. // 存在其他子进程,判断是否可以等会执行

  40. if (schedule) {

  41. // 存在其他子进程,设置server.rdb_bgsave_scheduled

  42. // serverCron定期检查没有子进程并且rdb_bgsave_scheduled==1

  43. // 调用rdbSaveBackground 开启子进程保存rdb

  44. server.rdb_bgsave_scheduled = 1;

  45. addReplyStatus(c,"Background saving scheduled");

  46. } else {

  47. addReplyError(c,

  48. "Another child process is active (AOF?): can't BGSAVE right now. "

  49. "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "

  50. "possible.");

  51. }

  52. } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {

  53. // 调用rdbSaveBackground 开启子进程保存rdb

  54. addReplyStatus(c,"Background saving started");

  55. } else {

  56. addReplyErrorObject(c,shared.err);

  57. }

  58. }

  59. // path:src/server.c

  60. int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

  61. // ...

  62. /* Start a scheduled BGSAVE if the corresponding flag is set. This is

  63. * useful when we are forced to postpone a BGSAVE because an AOF

  64. * rewrite is in progress.

  65. *

  66. * Note: this code must be after the replicationCron() call above so

  67. * make sure when refactoring this file to keep this order. This is useful

  68. * because we want to give priority to RDB savings for replication. */

  69. // 没有子进程

  70. // 并且被设置了调度执行(bgsave command)

  71. if (!hasActiveChildProcess() &&

  72. server.rdb_bgsave_scheduled &&

  73. (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||

  74. server.lastbgsave_status == C_OK))

  75. {

  76. rdbSaveInfo rsi, *rsiptr;

  77. rsiptr = rdbPopulateSaveInfo(&rsi);

  78. // 调用dbSaveBackground 产生子进程save rdb

  79. if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)

  80. server.rdb_bgsave_scheduled = 0;

  81. }

  82. // ...

  83. }

save
  • save命令直接在主进程遍历db生成rdb

bgsave
  • 在当前没有子进程运行时, 直接调用rdbSaveBackground开启子进程保存rdb

  • 如果存在其他子进程, 设置server.rdbbgsavescheduled, 即子进程结束后执行bg save

  • serverCron定期检查没有子进程并且rdbbgsavescheduled为1时调用rdbSaveBackground开启子进程保存rdb


diskless模式rdb处理

diskless模式只用于副本全量同步,本节聚焦关于diskless rdb处理(主从同步细节后续文章会细讲)

启动子进程处理diskless rdb
  1. // path: src/rdb

  2. /* This is just a wrapper to rdbSaveRio() that additionally adds a prefix

  3. * and a suffix to the generated RDB dump. The prefix is:

  4. *

  5. * $EOF:<40 bytes unguessable hex string>rn

  6. *

  7. * While the suffix is the 40 bytes hex string we announced in the prefix.

  8. * This way processes receiving the payload can understand when it ends

  9. * without doing any processing of the content. */

  10. // wrap rdbSaveRio()

  11. // 在rdb 前进前缀 和后缀

  12. // 可以方便识别文件什么时候结束而不必加载执行rdb

  13. int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {

  14. char eofmark[RDB_EOF_MARK_SIZE];

  15. startSaving(RDBFLAGS_REPLICATION);

  16. getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);

  17. if (error) *error = 0;

  18. if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;

  19. if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;

  20. if (rioWrite(rdb,"rn",2) == 0) goto werr;

  21. if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;

  22. if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;

  23. stopSaving(1);

  24. return C_OK;

  25. werr: /* Write error. */

  26. /* Set 'error' only if not already set by rdbSaveRio() call. */

  27. if (error && *error == 0) *error = errno;

  28. stopSaving(0);

  29. return C_ERR;

  30. }

  31. /* Spawn an RDB child that writes the RDB to the sockets of the slaves

  32. * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */

  33. // 生成rdb 通过pipe fd 发给主进程, 主机程再将rdb通过socket发送给副本

  34. int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {

  35. listNode *ln;

  36. listIter li;

  37. pid_t childpid;

  38. int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;

  39. // 存在子进程,保存

  40. if (hasActiveChildProcess()) return C_ERR;

  41. /* Even if the previous fork child exited, don't start a new one until we

  42. * drained the pipe. */

  43. if (server.rdb_pipe_conns) return C_ERR;

  44. /* Before to fork, create a pipe that is used to transfer the rdb bytes to

  45. * the parent, we can't let it write directly to the sockets, since in case

  46. * of TLS we must let the parent handle a continuous TLS state when the

  47. * child terminates and parent takes over. */

  48. if (pipe(pipefds) == -1) return C_ERR;

  49. // 创建父子进程通讯的句柄(发送rdb内容)

  50. server.rdb_pipe_read = pipefds[0]; /* read end */

  51. rdb_pipe_write = pipefds[1]; /* write end */

  52. // 不阻塞

  53. anetNonBlock(NULL, server.rdb_pipe_read);

  54. /* create another pipe that is used by the parent to signal to the child

  55. * that it can exit. */

  56. // 创建用于父进程通知子进程是否退出的fd

  57. if (pipe(pipefds) == -1) {

  58. close(rdb_pipe_write);

  59. close(server.rdb_pipe_read);

  60. return C_ERR;

  61. }

  62. safe_to_exit_pipe = pipefds[0]; /* read end */

  63. server.rdb_child_exit_pipe = pipefds[1]; /* write end */

  64. /* Collect the connections of the replicas we want to transfer

  65. * the RDB to, which are i WAIT_BGSAVE_START state. */

  66. server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));

  67. server.rdb_pipe_numconns = 0;

  68. server.rdb_pipe_numconns_writing = 0;

  69. // 遍历当前副本列表,找出那些SLAVE_STATE_WAIT_BGSAVE_START的副本, 告诉他们准备接收全量rdb

  70. listRewind(server.slaves,&li);

  71. while((ln = listNext(&li))) {

  72. client *slave = ln->value;

  73. // 处于SLAVE_STATE_WAIT_BGSAVE_START,即等待全量加载rdb

  74. if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {

  75. // 保存在server.rdb_pipe_conns

  76. // 后续收到子进程rdb buf直接遍历发送

  77. server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;

  78. // 告诉他们准备接收全量rdb

  79. // 并且设置为SLAVE_STATE_WAIT_BGSAVE_END

  80. replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());

  81. }

  82. }

  83. // 创建子进程用于保存rdb

  84. /* Create the child process. */

  85. if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {

  86. /* Child */

  87. int retval, dummy;

  88. rio rdb;

  89. // 初始化rio,写的fd其实是与父进程通讯的fd

  90. rioInitWithFd(&rdb,rdb_pipe_write);

  91. // 设置进程名字和cpu亲和性

  92. redisSetProcTitle("redis-rdb-to-slaves");

  93. redisSetCpuAffinity(server.bgsave_cpulist);

  94. // 扫描db,生成rdb,写入rio

  95. retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);

  96. if (retval == C_OK && rioFlush(&rdb) == 0)

  97. retval = C_ERR;

  98. // 发送结束消息

  99. if (retval == C_OK) {

  100. sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");

  101. }

  102. // 释放rio

  103. rioFreeFd(&rdb);

  104. /* wake up the reader, tell it we're done. */

  105. close(rdb_pipe_write);

  106. close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */

  107. /* hold exit until the parent tells us it's safe. we're not expecting

  108. * to read anything, just get the error when the pipe is closed. */

  109. // 等待父进程通知退出

  110. // 父进程读取完最后rdb内容,读到eof就会关闭safe_to_exit_pipe

  111. dummy = read(safe_to_exit_pipe, pipefds, 1);

  112. UNUSED(dummy);

  113. exitFromChild((retval == C_OK) ? 0 : 1);

  114. } else {

  115. /* Parent */

  116. close(safe_to_exit_pipe);

  117. if (childpid == -1) {

  118. // 创建子进程失败

  119. serverLog(LL_WARNING,"Can't save in background: fork: %s",

  120. strerror(errno));

  121. /* Undo the state change. The caller will perform cleanup on

  122. * all the slaves in BGSAVE_START state, but an early call to

  123. * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */

  124. // 遍历对应副本,重新设置为接收完整rdb的副本状态为SLAVE_STATE_WAIT_BGSAVE_START

  125. listRewind(server.slaves,&li);

  126. while((ln = listNext(&li))) {

  127. client *slave = ln->value;

  128. if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {

  129. slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;

  130. }

  131. }

  132. // 关闭通讯句柄

  133. close(rdb_pipe_write);

  134. close(server.rdb_pipe_read);

  135. zfree(server.rdb_pipe_conns);

  136. server.rdb_pipe_conns = NULL;

  137. server.rdb_pipe_numconns = 0;

  138. server.rdb_pipe_numconns_writing = 0;

  139. } else {

  140. serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",

  141. (long) childpid);

  142. server.rdb_save_time_start = time(NULL);

  143. server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;

  144. close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */

  145. // 创建成功, 设置监听rdb_pipe_read的hanlder 接收子进程发送过来的rdb 内容

  146. if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {

  147. serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");

  148. }

  149. }

  150. return (childpid == -1) ? C_ERR : C_OK;

  151. }

  152. return C_OK; /* Unreached. */

  153. }

diskless 启动子进程处理调用链为:

  1. int rdbSaveToSlavesSockets(rdbSaveInfo *rsi)// 生成rdb 通过pipe fd 发给主进程, 主机程再将rdb通过socket发送给副本

  2. - server.rdb_pipe_read = pipefds[0]; rdb_pipe_write = pipefds[1]; // 创建子进程向父进程发送rdb的 pipe

  3. - safe_to_exit_pipe = pipefds[0]; server.rdb_child_exit_pipe = pipefds[1]; // 创建用于父进程通知子进程是否退出的fd

  4. - while() //遍历副本client,找出那些SLAVE_STATE_WAIT_BGSAVE_START的副本, 告诉他们准备接收全量rdb

  5. - 将符合添加的副本客户端保存在server.rdb_pipe_conns //后续收到子进程rdb buf直接遍历发送

  6. - 发生协议告诉他们准备接收全量rdb,并且设置为SLAVE_STATE_WAIT_BGSAVE_END

  7. - int redisFork(int purpose) //fork子进程

  8. - rioInitWithFd(&rdb,rdb_pipe_write); // 初始化rio, rdb 写的fd其实是与父进程通讯的pipe fd

  9. - int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi)// 在rdb 前进前缀 和后缀,可以方便识别文件结束而不必加载rdb

  10. - rio(rdb)写前缀

  11. - int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi)// 生成rdb 文件

  12. - // 首先写magic

  13. - // 写全局的变量信息

  14. - for (j = 0; j < server.dbnum; j++)// 逐个扫描db,

  15. - rdbSaveType(rdb,RDB_OPCODE_SELECTDB) //写当前db 序号;

  16. - while(***) //扫描db 的所有kv, 写kv

  17. - int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,long long expiretime)// 保存k&v,并且保持过期时间

  18. - rdbSaveObjectType(rdb,val) // 写val类型比如RDB_TYPE_STRING, RDB_TYPE_ZSET_2

  19. - rdbSaveStringObject(rdb,key) // 将key写进去(key都是string)

  20. - rdbSaveObject(rdb,val,key) // 将val object 写到rdb

  21. - rdb保存一个具体的Redis object

  22. - rio(rdb)写后缀

  23. - close(rdb_pipe_write);// 关闭pipe写句柄,告诉父进程rdb结束了(EOF)

  24. - dummy = read(safe_to_exit_pipe, pipefds, 1); // 父进程读取完最后rdb内容,读到eof就会关闭safe_to_exit_pipe

  25. - exitFromChild((retval == C_OK) ? 0 : 1);// 退出子进程

跟diskless模式跟生成rdb文件默认,启动子进程处理的核心区别是:

  • 创建子进程向父进程发送rdb内容pipe

  • 创建父进程告诉子进程可以正常退出的pipe

  • 确定那些副本client会接受本次rdb 内容

  • 调用rdbSaveRioWithEOFMark生成有前后缀的rdb,方便副本收rdb知道什么时候接受完

  • 子进程通过关闭向父进程发送rdb内容pipe的fd 告诉父进程rdb 生成完毕

  • 子进程等待父进程通知才退出

diskless 父进程接收rdb
  1. // path: src/replication.c

  2. // 主进程收到子进程的rdb内容时 调用】

  3. // 将读取到的rdb 内容发送给本次接收rdb的副本client

  4. /* Called in diskless master, when there's data to read from the child's rdb pipe */

  5. void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {

  6. UNUSED(mask);

  7. UNUSED(clientData);

  8. UNUSED(eventLoop);

  9. int i;

  10. if (!server.rdb_pipe_buff)

  11. server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);

  12. serverAssert(server.rdb_pipe_numconns_writing==0);

  13. while (1) {

  14. server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);

  15. if (server.rdb_pipe_bufflen < 0) {

  16. if (errno == EAGAIN || errno == EWOULDBLOCK)

  17. return;

  18. serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s", strerror(errno));

  19. // 读取rdb内容异常

  20. // 直接关闭本次接收rdb的副本 client

  21. for (i=0; i < server.rdb_pipe_numconns; i++) {

  22. connection *conn = server.rdb_pipe_conns[i];

  23. if (!conn)

  24. continue;

  25. client *slave = connGetPrivateData(conn);

  26. freeClient(slave);

  27. server.rdb_pipe_conns[i] = NULL;

  28. }

  29. // kill rbd子进程

  30. killRDBChild();

  31. return;

  32. }

  33. if (server.rdb_pipe_bufflen == 0) {

  34. //收到字节数为0,代表子进程rdb已经发生完毕

  35. /* EOF - write end was closed. */

  36. int stillUp = 0;

  37. // 删除监听读事件

  38. aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);

  39. for (i=0; i < server.rdb_pipe_numconns; i++)

  40. {

  41. connection *conn = server.rdb_pipe_conns[i];

  42. if (!conn)

  43. continue;

  44. stillUp++;

  45. }

  46. serverLog(LL_WARNING,"Diskless rdb transfer, done reading from pipe, %d replicas still up.", stillUp);

  47. /* Now that the replicas have finished reading, notify the child that it's safe to exit.

  48. * When the server detectes the child has exited, it can mark the replica as online, and

  49. * start streaming the replication buffers. */

  50. // 关闭db_child_exit_pipe

  51. // 告诉子进程正常退出

  52. close(server.rdb_child_exit_pipe);

  53. server.rdb_child_exit_pipe = -1;

  54. return;

  55. }

  56. // 将读取到的rdb 内容发送给本次接收rdb的副本

  57. int stillAlive = 0;

  58. for (i=0; i < server.rdb_pipe_numconns; i++)

  59. {

  60. int nwritten;

  61. connection *conn = server.rdb_pipe_conns[i];

  62. if (!conn)

  63. continue;

  64. // 尽量给副本tcp发生内容

  65. client *slave = connGetPrivateData(conn);

  66. if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {

  67. if (connGetState(conn) != CONN_STATE_CONNECTED) {

  68. serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s",

  69. connGetLastError(conn));

  70. freeClient(slave);

  71. server.rdb_pipe_conns[i] = NULL;

  72. continue;

  73. }

  74. /* An error and still in connected state, is equivalent to EAGAIN */

  75. slave->repldboff = 0;

  76. } else {

  77. /* Note: when use diskless replication, 'repldboff' is the offset

  78. * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */

  79. slave->repldboff = nwritten;

  80. atomicIncr(server.stat_net_output_bytes, nwritten);

  81. }

  82. /* If we were unable to write all the data to one of the replicas,

  83. * setup write handler (and disable pipe read handler, below) */

  84. // 写到不可写, 设置监听副本客户端可写事件继续发送

  85. if (nwritten != server.rdb_pipe_bufflen) {

  86. slave->repl_last_partial_write = server.unixtime;

  87. server.rdb_pipe_numconns_writing++;

  88. connSetWriteHandler(conn, rdbPipeWriteHandler);

  89. }

  90. stillAlive++;

  91. }

  92. if (stillAlive == 0) {

  93. serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child.");

  94. // 如果需要接收的副本客户端都异常了,不用继续生成rdb了

  95. // 直接kill子进程

  96. killRDBChild();

  97. }

  98. /* Remove the pipe read handler if at least one write handler was set. */

  99. if (server.rdb_pipe_numconns_writing || stillAlive == 0) {

  100. aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);

  101. break;

  102. }

  103. }

  104. }

  105. // path: src/rdb.c

  106. /* Spawn an RDB child that writes the RDB to the sockets of the slaves

  107. * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */

  108. // 生成rdb 通过socke发送给副本

  109. int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {

  110. listNode *ln;

  111. // 创建父子进程通讯的pipe等初始化工作

  112. //...

  113. // 创建子进程用于保存rdb

  114. /* Create the child process. */

  115. if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {

  116. // 子进程处理逻辑

  117. //....

  118. } else {

  119. /* Parent */

  120. close(safe_to_exit_pipe);

  121. if (childpid == -1) {

  122. // 创建子进程失败处理逻辑

  123. // 关闭句柄,记录错误之类

  124. // ..。

  125. } else {

  126. serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",

  127. (long) childpid);

  128. server.rdb_save_time_start = time(NULL);

  129. server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;

  130. close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */

  131. // 创建成功, 设置监听rdb_pipe_read的hanlder 接收子进程发送过来的rdb 内容

  132. if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {

  133. serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");

  134. }

  135. }

  136. return (childpid == -1) ? C_ERR : C_OK;

  137. }

  138. return C_OK; /* Unreached. */

  139. }

父进程接收rdb内容handle设置:

  • 在rdbSaveToSlavesSockets创建子进程成功后,设置 server.rdbpiperead的读时间处理handle为rdbPipeReadHandler

rdbPipeReadHandler处理如下:

  • 通过 server.rdbpiperead读取rdb内容保存到server.rdb_buf中

  • 将读取到的rdb 内容发送给本次接收rdb的副本client 们

  • 某些副本client tcp已经不可写,监听其可写事件,设置rdbPipeWriteHandler, client可写时继续发送rdb

  • 如果父进程读到子进程pipe eof内容, 删除对server.rdbpiperead监听, 关闭server.rdbchildexit_pipe告诉子进程正常退出

diskless rdb结束后父进程处理
  1. // path src/rdb.c

  2. /* A background saving child (BGSAVE) terminated its work. Handle this.

  3. * This function covers the case of RDB -> Slaves socket transfers for

  4. * diskless replication. */

  5. // bgsave保存rdb发送给副本执行完成后, 主线程会执行这个handle

  6. static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {

  7. if (!bysignal && exitcode == 0) {

  8. // 成功

  9. serverLog(LL_NOTICE,

  10. "Background RDB transfer terminated with success");

  11. } else if (!bysignal && exitcode != 0) {

  12. // 异常

  13. serverLog(LL_WARNING, "Background transfer error");

  14. } else {

  15. // 被信号kill

  16. serverLog(LL_WARNING,

  17. "Background transfer terminated by signal %d", bysignal);

  18. }

  19. // 关闭通讯句柄

  20. if (server.rdb_child_exit_pipe!=-1)

  21. close(server.rdb_child_exit_pipe);

  22. // 删除子进程给父(主进程)发送rdb的fd

  23. aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);

  24. close(server.rdb_pipe_read);

  25. server.rdb_child_exit_pipe = -1;

  26. server.rdb_pipe_read = -1;

  27. zfree(server.rdb_pipe_conns);

  28. server.rdb_pipe_conns = NULL;

  29. server.rdb_pipe_numconns = 0;

  30. server.rdb_pipe_numconns_writing = 0;

  31. //释放缓冲器: 从子进程读取rdb内容缓冲区

  32. zfree(server.rdb_pipe_buff);

  33. server.rdb_pipe_buff = NULL;

  34. server.rdb_pipe_bufflen = 0;

  35. }

父进程处理bg save(文件)进程结束的调用链为:

  1. int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) //cron 定时执行

  2. - void checkChildrenDone(void) // 判断子进程是否退出并做处理

  3. - void backgroundSaveDoneHandler(int exitcode, int bysignal) // bg rdb 子进程完成后,会调用这个handle

  4. - void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) // bg save(diskless) 保存完rdb 后主线程调用handle处理

  5. - close(server.rdb_pipe_read); //关闭读取rdb的pipe句柄

  6. - zfree(server.rdb_pipe_conns); // 回收本次接受rdb的数组内存

  7. - zfree(server.rdb_pipe_buff); // 回收接受rdb用的buf


原文始发于微信公众号(吃瓜技术派):一文吃透Redis RDB持久化:从原理到源码的全方位解析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236037.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!