Redis源码解读-非集群模式下的主从切换机制

failover 切换主从

Redis 的 failover 命令可以用于手动触发主从切换。比如当需要进行主服务器维护升级时,可以使用命令先手动切换,升级后再切回

1. 基本语法

  1. FAILOVER [TO <HOST> <PORT> [FORCE]] [ABORT] [TIMEOUT <timeout>]

  • 命令必须在主节点执行(本节点不是从节点,并且有从节点连接本节点)

  • TO HOST PORT: 指定将某个特定从节点切换为主节点; 不指定即在符合条件前提下随机选择从节点

  • FORCE :到终止时间(TIMEOUT)时从节点仍未完成数据同步, 强制主从切换。

  • 设置FORCE 标记, 必须同时设置 HOST/PORT和TIMEOUT 参数

  • TIMEOUT: 主从切换终止时间。即默认不会超时,会一直pause客户端写命令执行直到从节点满足切换条件进行切换。如果设置终止时间并且非FORCE, 到终止时间时放弃主从切换; 如果设置终止时间并且非ORCE, 到终止时间时强制主从切换(而无需数据同步完成)。

  • ABORT: 终止正在进行的failover(故障转移,主从切换), unpause 客户端写命令执行。

2. 命令使用范例

  1. # 指定 host port

  2. FAILOVER TO 192.173.22.1 6639

  3. # 强制立刻主从切换而无需等待数据同步

  4. # 设置force 标记, 必须同时设置 host/prot 和timeout 参数

  5. FAILOVER TO 192.173.22.1 6639 FORCE timeout 10000

  6. # 终止正在进行的主从切换

  7. FAILOVER ABORT

3. 注意事项

  • FORCE选项可能会造成不同程度的数据丢失

  • 在生产环境需谨慎使用FORCE选项,最好根据场景选择 failover 提供了手动切换主从的能力,但需要根据实际场景需求谨慎使用,避免造成服务中断。

源码剖析

redis源码版本:6.2.5

主从切换核心流程为:failover 命令启动主从切换(设置对应标记位); 定时检测从节点数据同步是否完成; 同步完成后old 主与 准新主节点建立链接握手后发送psync failover 让准新主节点切换成新节点。因此本文从以下几个方面带着大家走读代码:

  • failover 命令处理

  • 从节点完成数据同步检测

  • 主从切换执行

  • 从节点切换成主节点

主从切换相关变量

在进入具体场景分析前,先来认识下与主从切换相关变量

  1. // path: src/server.h

  2. /* The state of an in progress coordinated failover */

  3. typedef enum {

  4. // 没有主从切换(或者已经完成)

  5. NO_FAILOVER = 0, /* No failover in progress */

  6. // 等待从节点完成数据同步阶段

  7. FAILOVER_WAIT_FOR_SYNC, /* Waiting for target replica to catch up */

  8. // 正在进行主从切换

  9. // 将给从节点发生PSYNC FAILOVER, 或者已经发送PSYNC FAILOVER 而在等他回复包阶段

  10. FAILOVER_IN_PROGRESS /* Waiting for target replica to accept

  11. * PSYNC FAILOVER request. */

  12. } failover_state;

  13. struct redisServer {

  14. // ...

  15. /* Coordinate failover info */

  16. // 非集群模式下 主动进行 主从切换相关变量

  17. // 主从切换终止时间(如果到ent_time未完成即放弃主从切换)

  18. mstime_t failover_end_time; /* Deadline for failover command. */

  19. // 是否强制主从切换(强制 无需等他从节点完成数据同步)

  20. int force_failover; /* If true then failover will be foreced at the

  21. * deadline, otherwise failover is aborted. */

  22. // host 和 port 不为空即指定 特定从节点切换成主节点

  23. char *target_replica_host; /* Failover target host. If null during a

  24. * failover then any replica can be used. */

  25. int target_replica_port; /* Failover target port */

  26. // 当前切换的状态

  27. // NO_FAILOVER, FAILOVER_WAIT_FOR_SYNC, FAILOVER_IN_PROGRESS

  28. int failover_state; /* Failover state */

  29. }

  • failover_state是 主从切换的核心变量, 代表是否在进行切换已经切换进行到那个流程。

  • 运行failover命令时实际上是对主从切换相关变量(非集群模式)进行判断和设置。

failover 命令处理

  1. /*

  2. * FAILOVER [TO <HOST> <PORT> [FORCE]] [ABORT] [TIMEOUT <timeout>]

  3. *

  4. * This command will coordinate a failover between the master and one

  5. * of its replicas. The happy path contains the following steps:

  6. * 1) The master will initiate a client pause write, to stop replication

  7. * traffic.

  8. * 2) The master will periodically check if any of its replicas has

  9. * consumed the entire replication stream through acks.

  10. * 3) Once any replica has caught up, the master will itself become a replica.

  11. * 4) The master will send a PSYNC FAILOVER request to the target replica, which

  12. * if accepted will cause the replica to become the new master and start a sync.

  13. *

  14. * FAILOVER ABORT is the only way to abort a failover command, as replicaof

  15. * will be disabled. This may be needed if the failover is unable to progress.

  16. *

  17. * The optional arguments [TO <HOST> <IP>] allows designating a specific replica

  18. * to be failed over to.

  19. *

  20. * FORCE flag indicates that even if the target replica is not caught up,

  21. * failover to it anyway. This must be specified with a timeout and a target

  22. * HOST and IP.

  23. *

  24. * TIMEOUT <timeout> indicates how long should the primary wait for

  25. * a replica to sync up before aborting. If not specified, the failover

  26. * will attempt forever and must be manually aborted.

  27. */

  28. // failover 命令处理函数(非集群模式下主从切换)

  29. void failoverCommand(client *c) {

  30. if (server.cluster_enabled) {

  31. // 集群模式下主动主从切换应该使用cluster failover

  32. addReplyError(c,"FAILOVER not allowed in cluster mode. "

  33. "Use CLUSTER FAILOVER command instead.");

  34. return;

  35. }

  36. /* Handle special case for abort */

  37. if ((c->argc == 2) && !strcasecmp(c->argv[1]->ptr,"abort")) {

  38. // abort命令

  39. if (server.failover_state == NO_FAILOVER) {

  40. // 没有处于任何 主从切换阶段, 直接返回错误

  41. addReplyError(c, "No failover in progress.");

  42. return;

  43. }

  44. // 终止主从切换

  45. abortFailover("Failover manually aborted");

  46. addReply(c,shared.ok);

  47. return;

  48. }

  49. long timeout_in_ms = 0;

  50. int force_flag = 0;

  51. long port = 0;

  52. char *host = NULL;

  53. /* Parse the command for syntax and arguments. */

  54. // 发起主从切换, 解析对应参数

  55. for (int j = 1; j < c->argc; j++) {

  56. if (!strcasecmp(c->argv[j]->ptr,"timeout") && (j + 1 < c->argc) &&

  57. timeout_in_ms == 0)

  58. {

  59. // timeout 时间

  60. if (getLongFromObjectOrReply(c,c->argv[j + 1],

  61. &timeout_in_ms,NULL) != C_OK) return;

  62. if (timeout_in_ms <= 0) {

  63. addReplyError(c,"FAILOVER timeout must be greater than 0");

  64. return;

  65. }

  66. j++;

  67. } else if (!strcasecmp(c->argv[j]->ptr,"to") && (j + 2 < c->argc) &&

  68. !host)

  69. {

  70. // 如果指定节点, 获取 host 和port

  71. if (getLongFromObjectOrReply(c,c->argv[j + 2],&port,NULL) != C_OK)

  72. return;

  73. host = c->argv[j + 1]->ptr;

  74. j += 2;

  75. } else if (!strcasecmp(c->argv[j]->ptr,"force") && !force_flag) {

  76. // 强制模式

  77. // 强制立刻主从切换,而无需暂停客户端写(pause)并等待从节点同步数据完毕

  78. force_flag = 1;

  79. } else {

  80. // 其他非法参数直接报错

  81. addReplyErrorObject(c,shared.syntaxerr);

  82. return;

  83. }

  84. }

  85. if (server.failover_state != NO_FAILOVER) {

  86. // 当前处于主从切换, 不能再次执行failover,报错

  87. addReplyError(c,"FAILOVER already in progress.");

  88. return;

  89. }

  90. if (server.masterhost) {

  91. // 从节点

  92. // 不是主节点, 无法执行failover

  93. addReplyError(c,"FAILOVER is not valid when server is a replica.");

  94. return;

  95. }

  96. if (listLength(server.slaves) == 0) {

  97. // 没有从节点也无法 进行主从切换

  98. addReplyError(c,"FAILOVER requires connected replicas.");

  99. return;

  100. }

  101. if (force_flag && (!timeout_in_ms || !host)) {

  102. // 设置force 标记, 必须同时设置 host/prot 和timeout 参数

  103. addReplyError(c,"FAILOVER with force option requires both a timeout "

  104. "and target HOST and IP.");

  105. return;

  106. }

  107. /* If a replica address was provided, validate that it is connected. */

  108. if (host) {

  109. // 如果指定host/port, 查找对应从节点 client

  110. client *replica = findReplica(host, port);

  111. if (replica == NULL) {

  112. // 找不到 返回错误回复

  113. addReplyError(c,"FAILOVER target HOST and PORT is not "

  114. "a replica.");

  115. return;

  116. }

  117. /* Check if requested replica is online */

  118. if (replica->replstate != SLAVE_STATE_ONLINE) {

  119. // 从节点不是online, 无法进行主从切换

  120. addReplyError(c,"FAILOVER target replica is not online.");

  121. return;

  122. }

  123. // 保存host/port 在变量 server.target_replica_host/server.target_replica_port

  124. server.target_replica_host = zstrdup(host);

  125. server.target_replica_port = port;

  126. serverLog(LL_NOTICE,"FAILOVER requested to %s:%ld.",host,port);

  127. } else {

  128. // 没有指定从节点,可以切换到任意从节点

  129. serverLog(LL_NOTICE,"FAILOVER requested to any replica.");

  130. }

  131. mstime_t now = mstime();

  132. if (timeout_in_ms) {

  133. // 记录 failover_end_time

  134. server.failover_end_time = now + timeout_in_ms;

  135. }

  136. // 保存force 标记

  137. server.force_failover = force_flag;

  138. // 走到这里, 符合条件,设置为等待从节点完成数据同步

  139. server.failover_state = FAILOVER_WAIT_FOR_SYNC;

  140. /* Cluster failover will unpause eventually */

  141. // 暂停所有写命令的客户端(目的是让从节点完成数据同步)

  142. pauseClients(LLONG_MAX,CLIENT_PAUSE_WRITE);

  143. addReply(c,shared.ok);

  144. }

  145. /* Resets the internal failover configuration, this needs

  146. * to be called after a failover either succeeds or fails

  147. * as it includes the client unpause. */

  148. // 清除failover(非集群模式下主从切换)相关变量状态

  149. // unpauseClients, 停止暂停(执行写命令)的客户端

  150. void clearFailoverState() {

  151. server.failover_end_time = 0;

  152. server.force_failover = 0;

  153. zfree(server.target_replica_host);

  154. server.target_replica_host = NULL;

  155. server.target_replica_port = 0;

  156. server.failover_state = NO_FAILOVER;

  157. unpauseClients();

  158. }

  159. /* Abort an ongoing failover if one is going on. */

  160. // 终止非集群模式的主从切换

  161. // 核心是断开主动 与即将成为主的从节点的链接

  162. // 清除failover(非集群模式下主从切换)相关变量状态 并且 停止暂停(执行写命令)的客户端

  163. void abortFailover(const char *err) {

  164. if (server.failover_state == NO_FAILOVER) return;

  165. if (server.target_replica_host) {

  166. serverLog(LL_NOTICE,"FAILOVER to %s:%d aborted: %s",

  167. server.target_replica_host,server.target_replica_port,err);

  168. } else {

  169. serverLog(LL_NOTICE,"FAILOVER to any replica aborted: %s",err);

  170. }

  171. if (server.failover_state == FAILOVER_IN_PROGRESS) {

  172. // 正在进行主从切换

  173. // 将给从节点发生PSYNC FAILOVER, 或者已经发送PSYNC FAILOVER 而在等他回复包阶段

  174. // 断开与从节点主动建立的链接

  175. // server.masterhost 设置为NULL

  176. replicationUnsetMaster();

  177. }

  178. // 清除failover(非集群模式下主从切换)相关变量状态

  179. // 包括 server.failover_end_time, server.force_failover, server.failover_state

  180. // server.target_replica_host , server.target_replica_port

  181. // unpauseClients, 停止暂停(执行写命令)的客户端

  182. clearFailoverState();

  183. }

调用链
  1. void failoverCommand(client *c) // failover 命令处理函数(非集群模式下主从切换)

  2. - if failover abort

  3. - if server.failover_state == NO_FAILOVER

  4. - 发送错误回复包, 返回

  5. - else

  6. - void abortFailover(const char *err) //终止非集群模式的主从切换

  7. - void replicationUnsetMaster(void) // 取消副本状态,断开与从节点主动建立的链接

  8. - void clearFailoverState() // 清除failover(非集群模式下主从切换)相关变量状态

  9. - void unpauseClients(void) // 停止暂停(执行写命令)的客户端

  10. - 发生执行成功回复包, 返回

  11. - 解析参数: host/port, force, timeout

  12. - if server.failover_state != NO_FAILOVER

  13. - 当前处于主从切换, 不能再次执行failover,报错

  14. - if server.masterhost

  15. - 发送错误回复,返回 //从节点,不是主节点, 无法执行failover

  16. - if listLength(server.slaves) == 0

  17. - 发送错误回复,返回 // 没有从节点也无法 进行主从切换

  18. - if force_flag && (!timeout_in_ms || !host)

  19. - 发送错误回复,返回 // 设置force 标记, 必须同时设置 host/prot 和timeout 参数

  20. - if host

  21. - 参试查找对应host的链接客户端,如果找不到 发送错误回复包并且返回 // 指定host/port,找不到 返回错误回复

  22. - 设置 server.failover_end_time

  23. - 设置 server.force_failover

  24. - server.failover_state = FAILOVER_WAIT_FOR_SYNC

  25. - void pauseClients(mstime_t end, pause_type type) // 暂停所有写命令的客户端(目的是让从节点完成数据同步)

综上:

  • failover abort 核心处理逻辑是判断是否处于主从切换阶段, 如果是重置节点为主节点,重置主从切换相关变量,unpause 客户端

  • 发起主从切换,主要检查相关状态,并且设置主从切换相关变量,pause 客户端;设置主从切换状态为FAILOVERWAITFOR_SYNC, 即等他从节点完成数据同步

从节点完成数据同步检测

  1. //path: src/server.c

  2. // 处理完所有客户端事件(命令)

  3. // 进入epoll_wait之前

  4. void beforeSleep(struct aeEventLoop *eventLoop) {

  5. // ...

  6. /* We may have recieved updates from clients about their current offset. NOTE:

  7. * this can't be done where the ACK is recieved since failover will disconnect

  8. * our clients. */

  9. // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段

  10. updateFailoverStatus();

  11. // ...

  12. }

  13. //path: src/replication.c

  14. /* Replication cron function, called 1 time per second. */

  15. // 处理主从复制相关的cron

  16. void replicationCron(void) {

  17. static long long replication_cron_loops = 0;

  18. /* Check failover status first, to see if we need to start

  19. * handling the failover. */

  20. // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段

  21. updateFailoverStatus();

  22. // ...

  23. }

  24. /* Failover cron function, checks coordinated failover state.

  25. *

  26. * Implementation note: The current implementation calls replicationSetMaster()

  27. * to start the failover request, this has some unintended side effects if the

  28. * failover doesn't work like blocked clients will be unblocked and replicas will

  29. * be disconnected. This could be optimized further.

  30. */

  31. // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段

  32. // beforeSleep 和 replicationCron 调用

  33. void updateFailoverStatus(void) {

  34. // 不是处于等他从节点完成数据同步阶段,直接返回

  35. if (server.failover_state != FAILOVER_WAIT_FOR_SYNC) return;

  36. mstime_t now = server.mstime;

  37. /* Check if failover operation has timed out */

  38. if (server.failover_end_time && server.failover_end_time <= now) {

  39. // 存在终止时间,并且现在是已经大于等于终止时间

  40. if (server.force_failover) {

  41. serverLog(LL_NOTICE,

  42. "FAILOVER to %s:%d time out exceeded, failing over.",

  43. server.target_replica_host, server.target_replica_port);

  44. // 如果是强制模式, 到达终止时间后强制切换

  45. server.failover_state = FAILOVER_IN_PROGRESS;

  46. /* If timeout has expired force a failover if requested. */

  47. // 以从节点的身份 与新的主节点建立 主从同步

  48. replicationSetMaster(server.target_replica_host,

  49. server.target_replica_port);

  50. return;

  51. } else {

  52. // 非强制模式, 到终止时间仍处于等待从节点完成数据同步阶段

  53. // 直接终止主从切换

  54. /* Force was not requested, so timeout. */

  55. abortFailover("Replica never caught up before timeout");

  56. return;

  57. }

  58. }

  59. /* Check to see if the replica has caught up so failover can start */

  60. client *replica = NULL;

  61. if (server.target_replica_host) {

  62. // 指定节点,获取对应副本

  63. replica = findReplica(server.target_replica_host,

  64. server.target_replica_port);

  65. } else {

  66. listIter li;

  67. listNode *ln;

  68. listRewind(server.slaves,&li);

  69. /* Find any replica that has matched our repl_offset */

  70. // 否则遍历所有从节点, 获取已经同步完的从节点

  71. while((ln = listNext(&li))) {

  72. replica = ln->value;

  73. if (replica->repl_ack_off == server.master_repl_offset) {

  74. char ip[NET_IP_STR_LEN], *replicaaddr = replica->slave_addr;

  75. if (!replicaaddr) {

  76. if (connPeerToString(replica->conn,ip,sizeof(ip),NULL) == -1)

  77. continue;

  78. replicaaddr = ip;

  79. }

  80. /* We are now failing over to this specific node */

  81. server.target_replica_host = zstrdup(replicaaddr);

  82. server.target_replica_port = replica->slave_listening_port;

  83. break;

  84. }

  85. }

  86. }

  87. /* We've found a replica that is caught up */

  88. if (replica && (replica->repl_ack_off == server.master_repl_offset)) {

  89. // 如果存在从节点并且从节点已经完成数据同步

  90. // 进入主从切换阶段

  91. server.failover_state = FAILOVER_IN_PROGRESS;

  92. serverLog(LL_NOTICE,

  93. "Failover target %s:%d is synced, failing over.",

  94. server.target_replica_host, server.target_replica_port);

  95. /* Designated replica is caught up, failover to it. */

  96. // 以从节点的身份 与新的主节点建立 主从同步

  97. replicationSetMaster(server.target_replica_host,

  98. server.target_replica_port);

  99. }

  100. }

调用链
  1. void beforeSleep(struct aeEventLoop *eventLoop) // 进入epoll_wait之前调用

  2. - void updateFailoverStatus(void) // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段

  3. - if server.failover_end_time && server.failover_end_time <= now // 存在终止时间,并且现在是已经大于等于终止时间

  4. - if 设置了force 标记

  5. - server.failover_state = FAILOVER_IN_PROGRESS; // 如果是强制模式, 到达终止时间后强制切换

  6. - void replicationSetMaster(char *ip, int port); // 以从节点的身份 与新的主节点建立 主从同步

  7. - else

  8. - void abortFailover(const char *err) //非强制模式, 到终止时间仍处于等待从节点完成数据同步阶段, 终止非集群模式的主从切换

  9. - if server.target_replica_host

  10. - 指定节点,获取对应副本

  11. - else

  12. - 遍历所有从节点, 获取已经同步完的从节点

  13. - if replica && (replica->repl_ack_off == server.master_repl_offset // 如果存在从节点并且从节点已经完成数据同步

  14. - server.failover_state = FAILOVER_IN_PROGRESS; // 进入主从切换流程

  15. - void replicationSetMaster(char *ip, int port); // 以从节点的身份 与准新主节点建立 主从同步

  16. void replicationCron(void) // 处理主从复制相关的cron

  17. - void updateFailoverStatus(void) // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段

  18. - ....

综上:

  • 在beforesleep 和 replicationCron 检测是否到了主从切换终止时间,如果到了终止时间,根据force 决定是终止主从切换还是强制发起主从切换(FAILOVERINPROGRESS)

  • 如果还没到终止时间, 判断相关副本是不是完成数据同步,如果完成发起主从切换(FAILOVERINPROGRESS)

主从切换执行

  1. //path: src/replication.c

  2. /* Set replication to the specified master address and port. */

  3. // 假如原来是从节点,断开与原来的主节点链接

  4. // 假如原来是主节点,将自己原来有关主从同步的信息(server.replid和server.master_repl_offset)

  5. // 变成一个cachemaster client

  6. // 建立起与新主节点的链接

  7. void replicationSetMaster(char *ip, int port) {

  8. int was_master = server.masterhost == NULL;

  9. sdsfree(server.masterhost);

  10. server.masterhost = NULL;

  11. // 断开与原来的主节点链接

  12. if (server.master) {

  13. freeClient(server.master);

  14. }

  15. // 断开所有block的客户端

  16. // 可能从主变成从,因此需要断开所有block的客户端

  17. // 因为block客户端可能只有主才能产生unblock

  18. disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */

  19. /* Setting masterhost only after the call to freeClient since it calls

  20. * replicationHandleMasterDisconnection which can trigger a re-connect

  21. * directly from within that call. */

  22. // 保存新的master 的ip 和端口

  23. server.masterhost = sdsnew(ip);

  24. server.masterport = port;

  25. /* Update oom_score_adj */

  26. setOOMScoreAdj(-1);

  27. /* Force our slaves to resync with us as well. They may hopefully be able

  28. * to partially resync with us, but we can notify the replid change. */

  29. // 主节点发生变化,断开本节点的副本(从)节点

  30. disconnectSlaves();

  31. cancelReplicationHandshake(0);

  32. /* Before destroying our master state, create a cached master using

  33. * our own parameters, to later PSYNC with the new master. */

  34. if (was_master) {

  35. // 假如原来是主节点,将自己原来有关主从同步的信息(server.replid和server.master_repl_offset)

  36. // 变成一个cachemaster client

  37. replicationDiscardCachedMaster();

  38. replicationCacheMasterUsingMyself();

  39. }

  40. /* Fire the role change modules event. */

  41. moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,

  42. REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,

  43. NULL);

  44. /* Fire the master link modules event. */

  45. if (server.repl_state == REPL_STATE_CONNECTED)

  46. moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,

  47. REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,

  48. NULL);

  49. server.repl_state = REPL_STATE_CONNECT;

  50. serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",

  51. server.masterhost, server.masterport);

  52. // 与新的主节点建立链接

  53. connectWithMaster();

  54. }

  55. // 链接主节点

  56. int connectWithMaster(void) {

  57. // 根据server.tls_replication 创建对应的socket

  58. server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();

  59. // 建立链接,并且设置链接建立后回调为 syncWithMaster函数

  60. if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,

  61. NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {

  62. // 如果异常,关闭链接

  63. serverLog(LL_WARNING,"Unable to connect to MASTER: %s",

  64. connGetLastError(server.repl_transfer_s));

  65. connClose(server.repl_transfer_s);

  66. server.repl_transfer_s = NULL;

  67. return C_ERR;

  68. }

  69. // 保存链接主节点时间戳

  70. server.repl_transfer_lastio = server.unixtime;

  71. // 设置为connecting状态

  72. server.repl_state = REPL_STATE_CONNECTING;

  73. serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");

  74. return C_OK;

  75. }

  76. /* This handler fires when the non blocking connect was able to

  77. * establish a connection with the master. */

  78. // 与主节点建立链接后 或者 握手阶段 的 回调函数

  79. // 核心是除了与主节点的握手, rdb文件内容传输

  80. void syncWithMaster(connection *conn) {

  81. // 握手交换信息

  82. //...

  83. /* Try a partial resynchonization. If we don't have a cached master

  84. * slaveTryPartialResynchronization() will at least try to use PSYNC

  85. * to start a full resynchronization so that we get the master replid

  86. * and the global offset, to try a partial resync at the next

  87. * reconnection attempt. */

  88. if (server.repl_state == REPL_STATE_SEND_PSYNC) {

  89. // psync发送阶段处理

  90. // 发送psync 命令

  91. if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {

  92. err = sdsnew("Write error sending the PSYNC command.");

  93. abortFailover("Write error to failover target");

  94. goto write_error;

  95. }

  96. // 设置处于等待psync回复包阶段

  97. server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;

  98. return;

  99. }

  100. /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */

  101. // 来到此处肯定是 等待psync 回复包阶段

  102. if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) {

  103. serverLog(LL_WARNING,"syncWithMaster(): state machine error, "

  104. "state should be RECEIVE_PSYNC but is %d",

  105. server.repl_state);

  106. goto error;

  107. }

  108. // read_reply设置为1,表示接收处理psync回复包

  109. psync_result = slaveTryPartialResynchronization(conn,1);

  110. if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ // 还没读取到完整回复包,下一次处理

  111. /* Check the status of the planned failover. We expect PSYNC_CONTINUE,

  112. * but there is nothing technically wrong with a full resync which

  113. * could happen in edge cases. */

  114. if (server.failover_state == FAILOVER_IN_PROGRESS) {

  115. // 故障转移主从切换阶段

  116. if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) {

  117. // 要求继续增量同步或者全量同步, 说明failover已经成功,准新主节点真正切换成主节点

  118. // 当前节点从主变成了从

  119. // 清除非集群模式主动切换(故障转移状态),unpauseClients

  120. clearFailoverState();

  121. } else {

  122. // 其它结果,非集群模式主动切换(故障转移状态)失败

  123. abortFailover("Failover target rejected psync request");

  124. return;

  125. }

  126. }

  127. // 执行到此 准新主节点 已经成为新的主节点,当前节点已经是新主节点从节点

  128. // 后续为全量或者增量同步处理

  129. // ...

  130. }

  131. // master 节点本身在loading状态等不能建立主从链接

  132. // 返回PSYNC_TRY_LATER, 代表稍后再试

  133. #define PSYNC_TRY_LATER 5

  134. int slaveTryPartialResynchronization(connection *conn, int read_reply) {

  135. char *psync_replid;

  136. char psync_offset[32];

  137. sds reply;

  138. /* Writing half */

  139. if (!read_reply) {

  140. // 写命令

  141. /* Initially set master_initial_offset to -1 to mark the current

  142. * master replid and offset as not valid. Later if we'll be able to do

  143. * a FULL resync using the PSYNC command we'll set the offset at the

  144. * right value, so that this information will be propagated to the

  145. * client structure representing the master into server.master. */

  146. // 设置自身的offset为-1, 意味着当前的 master replid and offset 无效

  147. server.master_initial_offset = -1;

  148. if (server.cached_master) {

  149. psync_replid = server.cached_master->replid;

  150. snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);

  151. serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);

  152. } else {

  153. serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");

  154. psync_replid = "?";

  155. memcpy(psync_offset,"-1",3);

  156. }

  157. /* Issue the PSYNC command, if this is a master with a failover in

  158. * progress then send the failover argument to the replica to cause it

  159. * to become a master */

  160. if (server.failover_state == FAILOVER_IN_PROGRESS) {

  161. // 处于故障转移阶段,带上FAILOVER 参数,让副本(准新主节点)切换为主

  162. reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL);

  163. } else {

  164. // 非故障转移阶段, 发送pysnc

  165. reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL);

  166. }

  167. if (reply != NULL) {

  168. serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);

  169. sdsfree(reply);

  170. connSetReadHandler(conn, NULL);

  171. return PSYNC_WRITE_ERROR;

  172. }

  173. return PSYNC_WAIT_REPLY;

  174. }

  175. /* Reading half */

  176. // 从tcp 读取回复包

  177. reply = receiveSynchronousResponse(conn);

  178. if (sdslen(reply) == 0) {

  179. /* The master may send empty newlines after it receives PSYNC

  180. * and before to reply, just to keep the connection alive. */

  181. sdsfree(reply);

  182. // 没有读到完成回包, 返回PSYNC_WAIT_REPLY

  183. return PSYNC_WAIT_REPLY;

  184. }

  185. // 取消读事件监听

  186. connSetReadHandler(conn, NULL);

  187. if (!strncmp(reply,"+FULLRESYNC",11)) {

  188. // 需要全量同步

  189. char *replid = NULL, *offset = NULL;

  190. /* FULL RESYNC, parse the reply in order to extract the replid

  191. * and the replication offset. */

  192. // 解析出master的replid 和 offset

  193. replid = strchr(reply,' ');

  194. if (replid) {

  195. replid++;

  196. offset = strchr(replid,' ');

  197. if (offset) offset++;

  198. }

  199. if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {

  200. serverLog(LL_WARNING,

  201. "Master replied with wrong +FULLRESYNC syntax.");

  202. /* This is an unexpected condition, actually the +FULLRESYNC

  203. * reply means that the master supports PSYNC, but the reply

  204. * format seems wrong. To stay safe we blank the master

  205. * replid to make sure next PSYNCs will fail. */

  206. // 这是一种异常状态, 把master_replid清除,让psync后续步骤失败

  207. memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);

  208. } else {

  209. // 保存master的replid 和 offset

  210. memcpy(server.master_replid, replid, offset-replid-1);

  211. server.master_replid[CONFIG_RUN_ID_SIZE] = '';

  212. server.master_initial_offset = strtoll(offset,NULL,10);

  213. serverLog(LL_NOTICE,"Full resync from master: %s:%lld",

  214. server.master_replid,

  215. server.master_initial_offset);

  216. }

  217. /* We are going to full resync, discard the cached master structure. */

  218. // 删除cache master

  219. replicationDiscardCachedMaster();

  220. sdsfree(reply);

  221. return PSYNC_FULLRESYNC;

  222. }

  223. if (!strncmp(reply,"+CONTINUE",9)) {

  224. // 收到增量同步回复

  225. /* Partial resync was accepted. */

  226. serverLog(LL_NOTICE,

  227. "Successful partial resynchronization with master.");

  228. /* Check the new replication ID advertised by the master. If it

  229. * changed, we need to set the new ID as primary ID, and set or

  230. * secondary ID as the old master ID up to the current offset, so

  231. * that our sub-slaves will be able to PSYNC with us after a

  232. * disconnection. */

  233. char *start = reply+10;

  234. char *end = reply+9;

  235. while(end[0] != 'r' && end[0] != 'n' && end[0] != '') end++;

  236. if (end-start == CONFIG_RUN_ID_SIZE) {

  237. // 从回复包解析出master RUN_ID

  238. char new[CONFIG_RUN_ID_SIZE+1];

  239. memcpy(new,start,CONFIG_RUN_ID_SIZE);

  240. new[CONFIG_RUN_ID_SIZE] = '';

  241. if (strcmp(new,server.cached_master->replid)) {

  242. // master ID 变化

  243. /* Master ID changed. */

  244. serverLog(LL_WARNING,"Master replication ID changed to %s",new);

  245. /* Set the old ID as our ID2, up to the current offset+1. */

  246. // 将现在的master ID 保存在server.replid2

  247. memcpy(server.replid2,server.cached_master->replid,

  248. sizeof(server.replid2));

  249. // 将现在副本复制log的 offset 保存second_replid_offset

  250. server.second_replid_offset = server.master_repl_offset+1;

  251. /* Update the cached master ID and our own primary ID to the

  252. * new one. */

  253. // 保存回复包

  254. memcpy(server.replid,new,sizeof(server.replid));

  255. memcpy(server.cached_master->replid,new,sizeof(server.replid));

  256. /* Disconnect all the sub-slaves: they need to be notified. */

  257. // 断开与自己相连的副本节点链接

  258. disconnectSlaves();

  259. }

  260. }

  261. /* Setup the replication to continue. */

  262. sdsfree(reply);

  263. // 用于psync 回复CONTINUE(增量同步时)

  264. // 将cached master client 转化为当前master client

  265. // 设置server.repl_state(与master) 已经建立链接完毕(REPL_STATE_CONNECTED)

  266. // 设置master client的可读事件调用readQueryFromClient

  267. replicationResurrectCachedMaster(conn);

  268. /* If this instance was restarted and we read the metadata to

  269. * PSYNC from the persistence file, our replication backlog could

  270. * be still not initialized. Create it. */

  271. if (server.repl_backlog == NULL) createReplicationBacklog();

  272. // 返回PSYNC_CONTINUE,代表增量同步

  273. return PSYNC_CONTINUE;

  274. }

  275. /* If we reach this point we received either an error (since the master does

  276. * not understand PSYNC or because it is in a special state and cannot

  277. * serve our request), or an unexpected reply from the master.

  278. *

  279. * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise

  280. * return PSYNC_TRY_LATER if we believe this is a transient error. */

  281. if (!strncmp(reply,"-NOMASTERLINK",13) ||

  282. !strncmp(reply,"-LOADING",8))

  283. {

  284. // master 节点本身在loading状态等不能建立主从链接

  285. // 返回PSYNC_TRY_LATER, 代表稍后再试

  286. serverLog(LL_NOTICE,

  287. "Master is currently unable to PSYNC "

  288. "but should be in the future: %s", reply);

  289. sdsfree(reply);

  290. return PSYNC_TRY_LATER;

  291. }

  292. // 走到这里说明不支持psync命令,打印相关日志

  293. if (strncmp(reply,"-ERR",4)) {

  294. /* If it's not an error, log the unexpected event. */

  295. serverLog(LL_WARNING,

  296. "Unexpected reply to PSYNC from master: %s", reply);

  297. } else {

  298. serverLog(LL_NOTICE,

  299. "Master does not support PSYNC or is in "

  300. "error state (reply: %s)", reply);

  301. }

  302. sdsfree(reply);

  303. // 删除缓存的cash master

  304. replicationDiscardCachedMaster();

  305. // 返回不支持psync

  306. return PSYNC_NOT_SUPPORTED;

  307. }

调用链

  1. void beforeSleep(struct aeEventLoop *eventLoop) // 进入epoll_wait之前调用

  2. - void updateFailoverStatus(void) // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段

  3. - void replicationSetMaster(char *ip, int port); // 如果存在从节点并且从节点已经完成数据同步, 以从节点的身份 与新的主节点建立 主从同步

  4. - int connectWithMaster(void) // 链接新主节点

  5. - 设置server.repl_state REPL_STATE_CONNECTING;

  6. - 设置建立完成回调函数为 syncWithMaster函数,核心作用是与主节点的握手, rdb文件内容传输

  7. void syncWithMaster(connection *conn) // 与主节点建立链接后 或者 握手阶段 的 回调函数

  8. - 握手交互信息等操作

  9. - slaveTryPartialResynchronization(conn,0) // read_reply设置为0, 发送psync

  10. - reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL); //处于故障转移阶段,带上FAILOVER 参数,让副本切换为主

  11. - psync_result = slaveTryPartialResynchronization(conn,1); // read_reply设置为1,表示接收处理psync回复包

  12. - if server.failover_state == FAILOVER_IN_PROGRESS) // 故障转移主从切换阶段

  13. - if psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC

  14. - clearFailoverState(); //要求继续增量同步或者全量同步, 说明failover已经成功,当前节点从主变成了从,清除非集群模式主动切换(故障转移状态),unpauseClients

  15. - else

  16. - abortFailover("Failover target rejected psync request"); // 其它结果,非集群模式主动切换(故障转移状态)失败

  17. - return;

  18. - 执行增量同步或全量同步处理后续操作处理 // 走到此当前节点已经成为新主节点 的从节点, 后续就是全量或者增量同步处理。

综上:

  • 在从节点完成全量同步后,调用replicationSetMaster建立本节点与新主节点链接,并且设置syncWithMaster作为回调处理函数

  • syncWithMaster 核心是与主节点建立链接后 或者 握手阶段 的 回调函数; 在syncWithMaster 发送psync 时检测是主动主从切换, 调用PSYNC FAILOVER触发 准新主节点主动切换为主节点。

  • 在收到准新主节点的 psync 回复包, 如果新主节点成功执行并返回增量同步或者全量同步,即此时准新主节点已经真正切换成主节点了,清除主从切换状态; 否则即主从切换失败,终止主从切换流程。

从节点切换成主节点

  1. /* SYNC and PSYNC command implementation. */

  2. // SYNC 和 PSYNC命令处理函数

  3. void syncCommand(client *c) {

  4. /* ignore SYNC if already slave or in monitor mode */

  5. // 如果已经打上slave, 说明已经处理过sync 命令

  6. if (c->flags & CLIENT_SLAVE) return;

  7. /* Check if this is a failover request to a replica with the same replid and

  8. * become a master if so. */

  9. if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") &&

  10. !strcasecmp(c->argv[3]->ptr,"failover"))

  11. {

  12. // psync failover 处理

  13. serverLog(LL_WARNING, "Failover request received for replid %s.",

  14. (unsigned char *)c->argv[1]->ptr);

  15. if (!server.masterhost) {

  16. addReplyError(c, "PSYNC FAILOVER can't be sent to a master.");

  17. return;

  18. }

  19. // replid 跟当前节点一样

  20. if (!strcasecmp(c->argv[1]->ptr,server.replid)) {

  21. // 断开与当前主节点链接

  22. // 取消副本状态, 即自身切换为主节点

  23. replicationUnsetMaster();

  24. sds client = catClientInfoString(sdsempty(),c);

  25. serverLog(LL_NOTICE,

  26. "MASTER MODE enabled (failover request from '%s')",client);

  27. sdsfree(client);

  28. } else {

  29. addReplyError(c, "PSYNC FAILOVER replid must match my replid.");

  30. return;

  31. }

  32. }

  33. // psync 数据同步处理逻辑

  34. // ...

  35. }

走读代码可知,准新主节点收到psync failover, 判断replid 是否合法一致,如果是调用replicationUnsetMaster 函数 断开与当前(old)主节点链接, 取消副本状态自身切换为主节点。


原文始发于微信公众号(吃瓜技术派):Redis源码解读-非集群模式下的主从切换机制

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236029.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!