failover 切换主从
Redis 的 failover 命令可以用于手动触发主从切换。比如当需要进行主服务器维护升级时,可以使用命令先手动切换,升级后再切回
1. 基本语法
FAILOVER [TO <HOST> <PORT> [FORCE]] [ABORT] [TIMEOUT <timeout>]
-
命令必须在主节点执行(本节点不是从节点,并且有从节点连接本节点)
-
TO HOST PORT: 指定将某个特定从节点切换为主节点; 不指定即在符合条件前提下随机选择从节点
-
FORCE :到终止时间(TIMEOUT)时从节点仍未完成数据同步, 强制主从切换。
-
设置FORCE 标记, 必须同时设置 HOST/PORT和TIMEOUT 参数
-
TIMEOUT: 主从切换终止时间。即默认不会超时,会一直pause客户端写命令执行直到从节点满足切换条件进行切换。如果设置终止时间并且非FORCE, 到终止时间时放弃主从切换; 如果设置终止时间并且非ORCE, 到终止时间时强制主从切换(而无需数据同步完成)。
-
ABORT: 终止正在进行的failover(故障转移,主从切换), unpause 客户端写命令执行。
2. 命令使用范例
# 指定 host port
FAILOVER TO 192.173.22.1 6639
# 强制立刻主从切换而无需等待数据同步
# 设置force 标记, 必须同时设置 host/prot 和timeout 参数
FAILOVER TO 192.173.22.1 6639 FORCE timeout 10000
# 终止正在进行的主从切换
FAILOVER ABORT
3. 注意事项
-
FORCE选项可能会造成不同程度的数据丢失
-
在生产环境需谨慎使用FORCE选项,最好根据场景选择 failover 提供了手动切换主从的能力,但需要根据实际场景需求谨慎使用,避免造成服务中断。
源码剖析
redis源码版本:6.2.5
主从切换核心流程为:failover 命令启动主从切换(设置对应标记位); 定时检测从节点数据同步是否完成; 同步完成后old 主与 准新主节点建立链接握手后发送psync failover 让准新主节点切换成新节点。因此本文从以下几个方面带着大家走读代码:
-
failover 命令处理
-
从节点完成数据同步检测
-
主从切换执行
-
从节点切换成主节点
主从切换相关变量
在进入具体场景分析前,先来认识下与主从切换相关变量
// path: src/server.h
/* The state of an in progress coordinated failover */
typedef enum {
// 没有主从切换(或者已经完成)
NO_FAILOVER = 0, /* No failover in progress */
// 等待从节点完成数据同步阶段
FAILOVER_WAIT_FOR_SYNC, /* Waiting for target replica to catch up */
// 正在进行主从切换
// 将给从节点发生PSYNC FAILOVER, 或者已经发送PSYNC FAILOVER 而在等他回复包阶段
FAILOVER_IN_PROGRESS /* Waiting for target replica to accept
* PSYNC FAILOVER request. */
} failover_state;
struct redisServer {
// ...
/* Coordinate failover info */
// 非集群模式下 主动进行 主从切换相关变量
// 主从切换终止时间(如果到ent_time未完成即放弃主从切换)
mstime_t failover_end_time; /* Deadline for failover command. */
// 是否强制主从切换(强制 无需等他从节点完成数据同步)
int force_failover; /* If true then failover will be foreced at the
* deadline, otherwise failover is aborted. */
// host 和 port 不为空即指定 特定从节点切换成主节点
char *target_replica_host; /* Failover target host. If null during a
* failover then any replica can be used. */
int target_replica_port; /* Failover target port */
// 当前切换的状态
// NO_FAILOVER, FAILOVER_WAIT_FOR_SYNC, FAILOVER_IN_PROGRESS
int failover_state; /* Failover state */
}
-
failover_state是 主从切换的核心变量, 代表是否在进行切换已经切换进行到那个流程。
-
运行failover命令时实际上是对主从切换相关变量(非集群模式)进行判断和设置。
failover 命令处理
/*
* FAILOVER [TO <HOST> <PORT> [FORCE]] [ABORT] [TIMEOUT <timeout>]
*
* This command will coordinate a failover between the master and one
* of its replicas. The happy path contains the following steps:
* 1) The master will initiate a client pause write, to stop replication
* traffic.
* 2) The master will periodically check if any of its replicas has
* consumed the entire replication stream through acks.
* 3) Once any replica has caught up, the master will itself become a replica.
* 4) The master will send a PSYNC FAILOVER request to the target replica, which
* if accepted will cause the replica to become the new master and start a sync.
*
* FAILOVER ABORT is the only way to abort a failover command, as replicaof
* will be disabled. This may be needed if the failover is unable to progress.
*
* The optional arguments [TO <HOST> <IP>] allows designating a specific replica
* to be failed over to.
*
* FORCE flag indicates that even if the target replica is not caught up,
* failover to it anyway. This must be specified with a timeout and a target
* HOST and IP.
*
* TIMEOUT <timeout> indicates how long should the primary wait for
* a replica to sync up before aborting. If not specified, the failover
* will attempt forever and must be manually aborted.
*/
// failover 命令处理函数(非集群模式下主从切换)
void failoverCommand(client *c) {
if (server.cluster_enabled) {
// 集群模式下主动主从切换应该使用cluster failover
addReplyError(c,"FAILOVER not allowed in cluster mode. "
"Use CLUSTER FAILOVER command instead.");
return;
}
/* Handle special case for abort */
if ((c->argc == 2) && !strcasecmp(c->argv[1]->ptr,"abort")) {
// abort命令
if (server.failover_state == NO_FAILOVER) {
// 没有处于任何 主从切换阶段, 直接返回错误
addReplyError(c, "No failover in progress.");
return;
}
// 终止主从切换
abortFailover("Failover manually aborted");
addReply(c,shared.ok);
return;
}
long timeout_in_ms = 0;
int force_flag = 0;
long port = 0;
char *host = NULL;
/* Parse the command for syntax and arguments. */
// 发起主从切换, 解析对应参数
for (int j = 1; j < c->argc; j++) {
if (!strcasecmp(c->argv[j]->ptr,"timeout") && (j + 1 < c->argc) &&
timeout_in_ms == 0)
{
// timeout 时间
if (getLongFromObjectOrReply(c,c->argv[j + 1],
&timeout_in_ms,NULL) != C_OK) return;
if (timeout_in_ms <= 0) {
addReplyError(c,"FAILOVER timeout must be greater than 0");
return;
}
j++;
} else if (!strcasecmp(c->argv[j]->ptr,"to") && (j + 2 < c->argc) &&
!host)
{
// 如果指定节点, 获取 host 和port
if (getLongFromObjectOrReply(c,c->argv[j + 2],&port,NULL) != C_OK)
return;
host = c->argv[j + 1]->ptr;
j += 2;
} else if (!strcasecmp(c->argv[j]->ptr,"force") && !force_flag) {
// 强制模式
// 强制立刻主从切换,而无需暂停客户端写(pause)并等待从节点同步数据完毕
force_flag = 1;
} else {
// 其他非法参数直接报错
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
if (server.failover_state != NO_FAILOVER) {
// 当前处于主从切换, 不能再次执行failover,报错
addReplyError(c,"FAILOVER already in progress.");
return;
}
if (server.masterhost) {
// 从节点
// 不是主节点, 无法执行failover
addReplyError(c,"FAILOVER is not valid when server is a replica.");
return;
}
if (listLength(server.slaves) == 0) {
// 没有从节点也无法 进行主从切换
addReplyError(c,"FAILOVER requires connected replicas.");
return;
}
if (force_flag && (!timeout_in_ms || !host)) {
// 设置force 标记, 必须同时设置 host/prot 和timeout 参数
addReplyError(c,"FAILOVER with force option requires both a timeout "
"and target HOST and IP.");
return;
}
/* If a replica address was provided, validate that it is connected. */
if (host) {
// 如果指定host/port, 查找对应从节点 client
client *replica = findReplica(host, port);
if (replica == NULL) {
// 找不到 返回错误回复
addReplyError(c,"FAILOVER target HOST and PORT is not "
"a replica.");
return;
}
/* Check if requested replica is online */
if (replica->replstate != SLAVE_STATE_ONLINE) {
// 从节点不是online, 无法进行主从切换
addReplyError(c,"FAILOVER target replica is not online.");
return;
}
// 保存host/port 在变量 server.target_replica_host/server.target_replica_port
server.target_replica_host = zstrdup(host);
server.target_replica_port = port;
serverLog(LL_NOTICE,"FAILOVER requested to %s:%ld.",host,port);
} else {
// 没有指定从节点,可以切换到任意从节点
serverLog(LL_NOTICE,"FAILOVER requested to any replica.");
}
mstime_t now = mstime();
if (timeout_in_ms) {
// 记录 failover_end_time
server.failover_end_time = now + timeout_in_ms;
}
// 保存force 标记
server.force_failover = force_flag;
// 走到这里, 符合条件,设置为等待从节点完成数据同步
server.failover_state = FAILOVER_WAIT_FOR_SYNC;
/* Cluster failover will unpause eventually */
// 暂停所有写命令的客户端(目的是让从节点完成数据同步)
pauseClients(LLONG_MAX,CLIENT_PAUSE_WRITE);
addReply(c,shared.ok);
}
/* Resets the internal failover configuration, this needs
* to be called after a failover either succeeds or fails
* as it includes the client unpause. */
// 清除failover(非集群模式下主从切换)相关变量状态
// unpauseClients, 停止暂停(执行写命令)的客户端
void clearFailoverState() {
server.failover_end_time = 0;
server.force_failover = 0;
zfree(server.target_replica_host);
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
unpauseClients();
}
/* Abort an ongoing failover if one is going on. */
// 终止非集群模式的主从切换
// 核心是断开主动 与即将成为主的从节点的链接
// 清除failover(非集群模式下主从切换)相关变量状态 并且 停止暂停(执行写命令)的客户端
void abortFailover(const char *err) {
if (server.failover_state == NO_FAILOVER) return;
if (server.target_replica_host) {
serverLog(LL_NOTICE,"FAILOVER to %s:%d aborted: %s",
server.target_replica_host,server.target_replica_port,err);
} else {
serverLog(LL_NOTICE,"FAILOVER to any replica aborted: %s",err);
}
if (server.failover_state == FAILOVER_IN_PROGRESS) {
// 正在进行主从切换
// 将给从节点发生PSYNC FAILOVER, 或者已经发送PSYNC FAILOVER 而在等他回复包阶段
// 断开与从节点主动建立的链接
// server.masterhost 设置为NULL
replicationUnsetMaster();
}
// 清除failover(非集群模式下主从切换)相关变量状态
// 包括 server.failover_end_time, server.force_failover, server.failover_state
// server.target_replica_host , server.target_replica_port
// unpauseClients, 停止暂停(执行写命令)的客户端
clearFailoverState();
}
调用链
void failoverCommand(client *c) // failover 命令处理函数(非集群模式下主从切换)
- if failover abort
- if server.failover_state == NO_FAILOVER
- 发送错误回复包, 返回
- else
- void abortFailover(const char *err) //终止非集群模式的主从切换
- void replicationUnsetMaster(void) // 取消副本状态,断开与从节点主动建立的链接
- void clearFailoverState() // 清除failover(非集群模式下主从切换)相关变量状态
- void unpauseClients(void) // 停止暂停(执行写命令)的客户端
- 发生执行成功回复包, 返回
- 解析参数: host/port, force, timeout
- if server.failover_state != NO_FAILOVER
- 当前处于主从切换, 不能再次执行failover,报错
- if server.masterhost
- 发送错误回复,返回 //从节点,不是主节点, 无法执行failover
- if listLength(server.slaves) == 0
- 发送错误回复,返回 // 没有从节点也无法 进行主从切换
- if force_flag && (!timeout_in_ms || !host)
- 发送错误回复,返回 // 设置force 标记, 必须同时设置 host/prot 和timeout 参数
- if host
- 参试查找对应host的链接客户端,如果找不到 发送错误回复包并且返回 // 指定host/port,找不到 返回错误回复
- 设置 server.failover_end_time
- 设置 server.force_failover
- server.failover_state = FAILOVER_WAIT_FOR_SYNC
- void pauseClients(mstime_t end, pause_type type) // 暂停所有写命令的客户端(目的是让从节点完成数据同步)
综上:
-
failover abort 核心处理逻辑是判断是否处于主从切换阶段, 如果是重置节点为主节点,重置主从切换相关变量,unpause 客户端
-
发起主从切换,主要检查相关状态,并且设置主从切换相关变量,pause 客户端;设置主从切换状态为FAILOVERWAITFOR_SYNC, 即等他从节点完成数据同步
从节点完成数据同步检测
//path: src/server.c
// 处理完所有客户端事件(命令)
// 进入epoll_wait之前
void beforeSleep(struct aeEventLoop *eventLoop) {
// ...
/* We may have recieved updates from clients about their current offset. NOTE:
* this can't be done where the ACK is recieved since failover will disconnect
* our clients. */
// 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段
updateFailoverStatus();
// ...
}
//path: src/replication.c
/* Replication cron function, called 1 time per second. */
// 处理主从复制相关的cron
void replicationCron(void) {
static long long replication_cron_loops = 0;
/* Check failover status first, to see if we need to start
* handling the failover. */
// 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段
updateFailoverStatus();
// ...
}
/* Failover cron function, checks coordinated failover state.
*
* Implementation note: The current implementation calls replicationSetMaster()
* to start the failover request, this has some unintended side effects if the
* failover doesn't work like blocked clients will be unblocked and replicas will
* be disconnected. This could be optimized further.
*/
// 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段
// beforeSleep 和 replicationCron 调用
void updateFailoverStatus(void) {
// 不是处于等他从节点完成数据同步阶段,直接返回
if (server.failover_state != FAILOVER_WAIT_FOR_SYNC) return;
mstime_t now = server.mstime;
/* Check if failover operation has timed out */
if (server.failover_end_time && server.failover_end_time <= now) {
// 存在终止时间,并且现在是已经大于等于终止时间
if (server.force_failover) {
serverLog(LL_NOTICE,
"FAILOVER to %s:%d time out exceeded, failing over.",
server.target_replica_host, server.target_replica_port);
// 如果是强制模式, 到达终止时间后强制切换
server.failover_state = FAILOVER_IN_PROGRESS;
/* If timeout has expired force a failover if requested. */
// 以从节点的身份 与新的主节点建立 主从同步
replicationSetMaster(server.target_replica_host,
server.target_replica_port);
return;
} else {
// 非强制模式, 到终止时间仍处于等待从节点完成数据同步阶段
// 直接终止主从切换
/* Force was not requested, so timeout. */
abortFailover("Replica never caught up before timeout");
return;
}
}
/* Check to see if the replica has caught up so failover can start */
client *replica = NULL;
if (server.target_replica_host) {
// 指定节点,获取对应副本
replica = findReplica(server.target_replica_host,
server.target_replica_port);
} else {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
/* Find any replica that has matched our repl_offset */
// 否则遍历所有从节点, 获取已经同步完的从节点
while((ln = listNext(&li))) {
replica = ln->value;
if (replica->repl_ack_off == server.master_repl_offset) {
char ip[NET_IP_STR_LEN], *replicaaddr = replica->slave_addr;
if (!replicaaddr) {
if (connPeerToString(replica->conn,ip,sizeof(ip),NULL) == -1)
continue;
replicaaddr = ip;
}
/* We are now failing over to this specific node */
server.target_replica_host = zstrdup(replicaaddr);
server.target_replica_port = replica->slave_listening_port;
break;
}
}
}
/* We've found a replica that is caught up */
if (replica && (replica->repl_ack_off == server.master_repl_offset)) {
// 如果存在从节点并且从节点已经完成数据同步
// 进入主从切换阶段
server.failover_state = FAILOVER_IN_PROGRESS;
serverLog(LL_NOTICE,
"Failover target %s:%d is synced, failing over.",
server.target_replica_host, server.target_replica_port);
/* Designated replica is caught up, failover to it. */
// 以从节点的身份 与新的主节点建立 主从同步
replicationSetMaster(server.target_replica_host,
server.target_replica_port);
}
}
调用链
void beforeSleep(struct aeEventLoop *eventLoop) // 进入epoll_wait之前调用
- void updateFailoverStatus(void) // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段
- if server.failover_end_time && server.failover_end_time <= now // 存在终止时间,并且现在是已经大于等于终止时间
- if 设置了force 标记
- server.failover_state = FAILOVER_IN_PROGRESS; // 如果是强制模式, 到达终止时间后强制切换
- void replicationSetMaster(char *ip, int port); // 以从节点的身份 与新的主节点建立 主从同步
- else
- void abortFailover(const char *err) //非强制模式, 到终止时间仍处于等待从节点完成数据同步阶段, 终止非集群模式的主从切换
- if server.target_replica_host
- 指定节点,获取对应副本
- else
- 遍历所有从节点, 获取已经同步完的从节点
- if replica && (replica->repl_ack_off == server.master_repl_offset // 如果存在从节点并且从节点已经完成数据同步
- server.failover_state = FAILOVER_IN_PROGRESS; // 进入主从切换流程
- void replicationSetMaster(char *ip, int port); // 以从节点的身份 与准新主节点建立 主从同步
void replicationCron(void) // 处理主从复制相关的cron
- void updateFailoverStatus(void) // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段
- ....
综上:
-
在beforesleep 和 replicationCron 检测是否到了主从切换终止时间,如果到了终止时间,根据force 决定是终止主从切换还是强制发起主从切换(FAILOVERINPROGRESS)
-
如果还没到终止时间, 判断相关副本是不是完成数据同步,如果完成发起主从切换(FAILOVERINPROGRESS)
主从切换执行
//path: src/replication.c
/* Set replication to the specified master address and port. */
// 假如原来是从节点,断开与原来的主节点链接
// 假如原来是主节点,将自己原来有关主从同步的信息(server.replid和server.master_repl_offset)
// 变成一个cachemaster client
// 建立起与新主节点的链接
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
server.masterhost = NULL;
// 断开与原来的主节点链接
if (server.master) {
freeClient(server.master);
}
// 断开所有block的客户端
// 可能从主变成从,因此需要断开所有block的客户端
// 因为block客户端可能只有主才能产生unblock
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
/* Setting masterhost only after the call to freeClient since it calls
* replicationHandleMasterDisconnection which can trigger a re-connect
* directly from within that call. */
// 保存新的master 的ip 和端口
server.masterhost = sdsnew(ip);
server.masterport = port;
/* Update oom_score_adj */
setOOMScoreAdj(-1);
/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
// 主节点发生变化,断开本节点的副本(从)节点
disconnectSlaves();
cancelReplicationHandshake(0);
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) {
// 假如原来是主节点,将自己原来有关主从同步的信息(server.replid和server.master_repl_offset)
// 变成一个cachemaster client
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.repl_state = REPL_STATE_CONNECT;
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
// 与新的主节点建立链接
connectWithMaster();
}
// 链接主节点
int connectWithMaster(void) {
// 根据server.tls_replication 创建对应的socket
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
// 建立链接,并且设置链接建立后回调为 syncWithMaster函数
if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
// 如果异常,关闭链接
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
connGetLastError(server.repl_transfer_s));
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
return C_ERR;
}
// 保存链接主节点时间戳
server.repl_transfer_lastio = server.unixtime;
// 设置为connecting状态
server.repl_state = REPL_STATE_CONNECTING;
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
return C_OK;
}
/* This handler fires when the non blocking connect was able to
* establish a connection with the master. */
// 与主节点建立链接后 或者 握手阶段 的 回调函数
// 核心是除了与主节点的握手, rdb文件内容传输
void syncWithMaster(connection *conn) {
// 握手交换信息
//...
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master replid
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
// psync发送阶段处理
// 发送psync 命令
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
abortFailover("Write error to failover target");
goto write_error;
}
// 设置处于等待psync回复包阶段
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
return;
}
/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
// 来到此处肯定是 等待psync 回复包阶段
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) {
serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
"state should be RECEIVE_PSYNC but is %d",
server.repl_state);
goto error;
}
// read_reply设置为1,表示接收处理psync回复包
psync_result = slaveTryPartialResynchronization(conn,1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ // 还没读取到完整回复包,下一次处理
/* Check the status of the planned failover. We expect PSYNC_CONTINUE,
* but there is nothing technically wrong with a full resync which
* could happen in edge cases. */
if (server.failover_state == FAILOVER_IN_PROGRESS) {
// 故障转移主从切换阶段
if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) {
// 要求继续增量同步或者全量同步, 说明failover已经成功,准新主节点真正切换成主节点
// 当前节点从主变成了从
// 清除非集群模式主动切换(故障转移状态),unpauseClients
clearFailoverState();
} else {
// 其它结果,非集群模式主动切换(故障转移状态)失败
abortFailover("Failover target rejected psync request");
return;
}
}
// 执行到此 准新主节点 已经成为新的主节点,当前节点已经是新主节点从节点
// 后续为全量或者增量同步处理
// ...
}
// master 节点本身在loading状态等不能建立主从链接
// 返回PSYNC_TRY_LATER, 代表稍后再试
#define PSYNC_TRY_LATER 5
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
// 写命令
/* Initially set master_initial_offset to -1 to mark the current
* master replid and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
// 设置自身的offset为-1, 意味着当前的 master replid and offset 无效
server.master_initial_offset = -1;
if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command, if this is a master with a failover in
* progress then send the failover argument to the replica to cause it
* to become a master */
if (server.failover_state == FAILOVER_IN_PROGRESS) {
// 处于故障转移阶段,带上FAILOVER 参数,让副本(准新主节点)切换为主
reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL);
} else {
// 非故障转移阶段, 发送pysnc
reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL);
}
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
/* Reading half */
// 从tcp 读取回复包
reply = receiveSynchronousResponse(conn);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
// 没有读到完成回包, 返回PSYNC_WAIT_REPLY
return PSYNC_WAIT_REPLY;
}
// 取消读事件监听
connSetReadHandler(conn, NULL);
if (!strncmp(reply,"+FULLRESYNC",11)) {
// 需要全量同步
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the replid
* and the replication offset. */
// 解析出master的replid 和 offset
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
// 这是一种异常状态, 把master_replid清除,让psync后续步骤失败
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
// 保存master的replid 和 offset
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
// 删除cache master
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,"+CONTINUE",9)) {
// 收到增量同步回复
/* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != 'r' && end[0] != 'n' && end[0] != '') end++;
if (end-start == CONFIG_RUN_ID_SIZE) {
// 从回复包解析出master RUN_ID
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '';
if (strcmp(new,server.cached_master->replid)) {
// master ID 变化
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);
/* Set the old ID as our ID2, up to the current offset+1. */
// 将现在的master ID 保存在server.replid2
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
// 将现在副本复制log的 offset 保存second_replid_offset
server.second_replid_offset = server.master_repl_offset+1;
/* Update the cached master ID and our own primary ID to the
* new one. */
// 保存回复包
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));
/* Disconnect all the sub-slaves: they need to be notified. */
// 断开与自己相连的副本节点链接
disconnectSlaves();
}
}
/* Setup the replication to continue. */
sdsfree(reply);
// 用于psync 回复CONTINUE(增量同步时)
// 将cached master client 转化为当前master client
// 设置server.repl_state(与master) 已经建立链接完毕(REPL_STATE_CONNECTED)
// 设置master client的可读事件调用readQueryFromClient
replicationResurrectCachedMaster(conn);
/* If this instance was restarted and we read the metadata to
* PSYNC from the persistence file, our replication backlog could
* be still not initialized. Create it. */
if (server.repl_backlog == NULL) createReplicationBacklog();
// 返回PSYNC_CONTINUE,代表增量同步
return PSYNC_CONTINUE;
}
/* If we reach this point we received either an error (since the master does
* not understand PSYNC or because it is in a special state and cannot
* serve our request), or an unexpected reply from the master.
*
* Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
* return PSYNC_TRY_LATER if we believe this is a transient error. */
if (!strncmp(reply,"-NOMASTERLINK",13) ||
!strncmp(reply,"-LOADING",8))
{
// master 节点本身在loading状态等不能建立主从链接
// 返回PSYNC_TRY_LATER, 代表稍后再试
serverLog(LL_NOTICE,
"Master is currently unable to PSYNC "
"but should be in the future: %s", reply);
sdsfree(reply);
return PSYNC_TRY_LATER;
}
// 走到这里说明不支持psync命令,打印相关日志
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
serverLog(LL_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
serverLog(LL_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
// 删除缓存的cash master
replicationDiscardCachedMaster();
// 返回不支持psync
return PSYNC_NOT_SUPPORTED;
}
调用链
void beforeSleep(struct aeEventLoop *eventLoop) // 进入epoll_wait之前调用
- void updateFailoverStatus(void) // 检测主从切换(非集群)时 从节点是否完成数据同步, 变更为进行主从切换阶段
- void replicationSetMaster(char *ip, int port); // 如果存在从节点并且从节点已经完成数据同步, 以从节点的身份 与新的主节点建立 主从同步
- int connectWithMaster(void) // 链接新主节点
- 设置server.repl_state 为 REPL_STATE_CONNECTING;
- 设置建立完成回调函数为 syncWithMaster函数,核心作用是与主节点的握手, rdb文件内容传输
void syncWithMaster(connection *conn) // 与主节点建立链接后 或者 握手阶段 的 回调函数
- 握手交互信息等操作
- slaveTryPartialResynchronization(conn,0) // read_reply设置为0, 发送psync
- reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL); //处于故障转移阶段,带上FAILOVER 参数,让副本切换为主
- psync_result = slaveTryPartialResynchronization(conn,1); // read_reply设置为1,表示接收处理psync回复包
- if server.failover_state == FAILOVER_IN_PROGRESS) // 故障转移主从切换阶段
- if psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC
- clearFailoverState(); //要求继续增量同步或者全量同步, 说明failover已经成功,当前节点从主变成了从,清除非集群模式主动切换(故障转移状态),unpauseClients
- else
- abortFailover("Failover target rejected psync request"); // 其它结果,非集群模式主动切换(故障转移状态)失败
- return;
- 执行增量同步或全量同步处理后续操作处理 // 走到此当前节点已经成为新主节点 的从节点, 后续就是全量或者增量同步处理。
综上:
-
在从节点完成全量同步后,调用replicationSetMaster建立本节点与新主节点链接,并且设置syncWithMaster作为回调处理函数
-
syncWithMaster 核心是与主节点建立链接后 或者 握手阶段 的 回调函数; 在syncWithMaster 发送psync 时检测是主动主从切换, 调用PSYNC FAILOVER触发 准新主节点主动切换为主节点。
-
在收到准新主节点的 psync 回复包, 如果新主节点成功执行并返回增量同步或者全量同步,即此时准新主节点已经真正切换成主节点了,清除主从切换状态; 否则即主从切换失败,终止主从切换流程。
从节点切换成主节点
/* SYNC and PSYNC command implementation. */
// SYNC 和 PSYNC命令处理函数
void syncCommand(client *c) {
/* ignore SYNC if already slave or in monitor mode */
// 如果已经打上slave, 说明已经处理过sync 命令
if (c->flags & CLIENT_SLAVE) return;
/* Check if this is a failover request to a replica with the same replid and
* become a master if so. */
if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") &&
!strcasecmp(c->argv[3]->ptr,"failover"))
{
// psync failover 处理
serverLog(LL_WARNING, "Failover request received for replid %s.",
(unsigned char *)c->argv[1]->ptr);
if (!server.masterhost) {
addReplyError(c, "PSYNC FAILOVER can't be sent to a master.");
return;
}
// replid 跟当前节点一样
if (!strcasecmp(c->argv[1]->ptr,server.replid)) {
// 断开与当前主节点链接
// 取消副本状态, 即自身切换为主节点
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,
"MASTER MODE enabled (failover request from '%s')",client);
sdsfree(client);
} else {
addReplyError(c, "PSYNC FAILOVER replid must match my replid.");
return;
}
}
// psync 数据同步处理逻辑
// ...
}
走读代码可知,准新主节点收到psync failover, 判断replid 是否合法一致,如果是调用replicationUnsetMaster 函数 断开与当前(old)主节点链接, 取消副本状态自身切换为主节点。
原文始发于微信公众号(吃瓜技术派):Redis源码解读-非集群模式下的主从切换机制
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236029.html