面对大数据量和高并发场景,Redis集群是一种可扩展和高可用的部署模式。那么,Redis集群是如何创建的呢?本文将从源码级别详细剖析集群的创建过程。
搭建redis集群
首先介绍下如何搭建redis集群
前置准备
-
下载redis源码:
git clone git@github.com:redis/redis.git
cd redis
git checkout v6
-
编译源码
make
编译链接后,生成一些可执行文件:
redis-server, redis服务端可执行文件
redis-cli, redis客户端,用户链接redis实例执行命令
创建集群
1. 启动Redis实例
cd utils/create-cluster
./create-cluster start
运行命令后,脚本启动6个redis节点
Starting 30001 #redis实例监听端口个
Starting 30002
Starting 30003
Starting 30004
Starting 30005
Starting 30006
同时可以在create-cluster目录下看到不同实例的集群配置文件(nodes-****.conf),日志文件,和aof 文件
30001.log
30002.log
30003.log
30004.log
30005.log
30006.log
appendonly-30001.aof
appendonly-30002.aof
appendonly-30003.aof
appendonly-30004.aof
appendonly-30005.aof
appendonly-30006.aof
nodes-30001.conf
nodes-30002.conf
nodes-30003.conf
nodes-30004.conf
nodes-30005.conf
nodes-30006.conf
2.创建集群
./create-cluster create
运行命令后,便将上一步运行的实例组成3主3从的redis集群:
redis-cli -p 30001
127.0.0.1:30001> cluster nodes
3385e2a8bf971f9e4fb04b2045174d84517ddd51 127.0.0.1:30003@40003 master - 0 1691231147062 3 connected 10923-16383
6d2dca34675d5457e994df9c4cb7b4197cd5848c 127.0.0.1:30005@40005 slave d0b63c70738d6dffc48a4bee7448abb702479b2e 0 1691231147164 1 connected
d0b63c70738d6dffc48a4bee7448abb702479b2e 127.0.0.1:30001@40001 myself,master - 0 1691231147000 1 connected 0-5460
4f4ac20ec2a7982b684292fdacff7403e16a5d6a 127.0.0.1:30002@40002 master - 0 1691231146960 2 connected 5461-10922
f653d09280f7ac019b4a923c967f547c74eb7164 127.0.0.1:30004@40004 slave 3385e2a8bf971f9e4fb04b2045174d84517ddd51 0 1691231147062 3 connected
4aa656d1423c5c9352f0c03c3aa2b9619b247d5a 127.0.0.1:30006@40006 slave 4f4ac20ec2a7982b684292fdacff7403e16a5d6a 0 1691231147164 2 connected
综上,redis集群的创建流程:
-
create-cluster start 创建redis 实例
-
create-cluster create 对第一步创建的实例列表建立集群关系
create-cluster脚本剖析
#!/bin/bash
# Settings
# 参数配置
# redis可执行文件所在文件夹
BIN_PATH="../../src/"
# 启动redis时 监听的ip
CLUSTER_HOST=127.0.0.1
# 启动redis时 监听端口(第一个实例)
PORT=30000
# redis集群ping/pong 超时时间
TIMEOUT=2000
# ./create-cluster start时创建的实例个数
NODES=6
# ./create-cluster create 创建集群时, 每个master有多少个从节点
REPLICAS=1
PROTECTED_MODE=yes
ADDITIONAL_OPTIONS=""
# You may want to put the above config parameters into config.sh in order to
# override the defaults without modifying this script.
# 覆盖默认参数, 可以在脚本同目录下创建config.sh覆盖
if [ -a config.sh ]
then
source "config.sh"
fi
# Computed vars
# 根据实例个数计算出 截止监听端口
# 用于start 命令 停止循环
ENDPORT=$((PORT+NODES))
# start 命令
if [ "$1" == "start" ]
then
while [ $((PORT < ENDPORT)) != "0" ]; do
PORT=$((PORT+1))
echo "Starting $PORT"
# 使用redis-server 二进制逐个启动 redis 实例
$BIN_PATH/redis-server --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes ${ADDITIONAL_OPTIONS}
done
exit 0
fi
# 创建集群命令
if [ "$1" == "create" ]
then
HOSTS=""
# 拼接所有实例的ip 端口
while [ $((PORT < ENDPORT)) != "0" ]; do
PORT=$((PORT+1))
HOSTS="$HOSTS $CLUSTER_HOST:$PORT"
done
OPT_ARG=""
if [ "$2" == "-f" ]; then
OPT_ARG="--cluster-yes"
fi
# 使用redis-cli --cluster create 创建集群
$BIN_PATH/redis-cli --cluster create $HOSTS --cluster-replicas $REPLICAS $OPT_ARG
exit 0
fi
# 停止实例运行
if [ "$1" == "stop" ]
then
while [ $((PORT < ENDPORT)) != "0" ]; do
PORT=$((PORT+1))
echo "Stopping $PORT"
# 逐个实例 调用shutdown nosave 停止运行
$BIN_PATH/redis-cli -p $PORT shutdown nosave
done
exit 0
fi
# watch redis集群节点信息
if [ "$1" == "watch" ]
then
PORT=$((PORT+1))
while [ 1 ]; do
clear
date
# 调用第一个节点 cluster nodes命令获取集群信息
$BIN_PATH/redis-cli -p $PORT cluster nodes | head -30
sleep 1
done
exit 0
fi
# 查看对应redis实例日志
if [ "$1" == "tail" ]
then
INSTANCE=$2
PORT=$((PORT+INSTANCE))
tail -f ${PORT}.log
exit 0
fi
# 查看所有实例日志
if [ "$1" == "tailall" ]
then
tail -f *.log
exit 0
fi
# 对所有实例运行明个命令
if [ "$1" == "call" ]
then
while [ $((PORT < ENDPORT)) != "0" ]; do
PORT=$((PORT+1))
$BIN_PATH/redis-cli -p $PORT $2 $3 $4 $5 $6 $7 $8 $9
done
exit 0
fi
# 清除日志文件,配置文件,aof文件
if [ "$1" == "clean" ]
then
rm -rf *.log
rm -rf appendonly*.aof
rm -rf dump*.rdb
rm -rf nodes*.conf
exit 0
fi
# 清除日志文件
if [ "$1" == "clean-logs" ]
then
rm -rf *.log
exit 0
fi
echo "Usage: $0 [start|create|stop|watch|tail|clean|call]"
echo "start -- Launch Redis Cluster instances."
echo "create [-f] -- Create a cluster using redis-cli --cluster create."
echo "stop -- Stop Redis Cluster instances."
echo "watch -- Show CLUSTER NODES output (first 30 lines) of first node."
echo "tail <id> -- Run tail -f of instance at base port + ID."
echo "tailall -- Run tail -f for all the log files at once."
echo "clean -- Remove all instances data, logs, configs."
echo "clean-logs -- Remove just instances logs."
echo "call <cmd> -- Call a command (up to 7 arguments) on all nodes."
分析create-cluster脚本,得出以下结论:
-
start, 实际上直接使用redis-server运行多个redis实例
-
create, 调用redis-cli –cluster create 创建集群
redis-cli –cluster create 源码剖析
redis源码版本:6.2.5
–cluster 命令处理函数定义
// path:src/redis-cli.c
// 命令定义结构
typedef struct clusterManagerCommandDef {
// 命令名字
char *name;
// 命令处理函数
clusterManagerCommandProc *proc;
// 命令参数个数
// <0,至少要-arity个
// >0,等于arity个
int arity;
// 必带参数
char *args;
// 可选参数提示
char *options;
} clusterManagerCommandDef;
// 不同--cluster 的命令定义
clusterManagerCommandDef clusterManagerCommands[] = {
// create的处理函数为 clusterManagerCommandCreate,至少需要两个参数
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"replicas <arg>"},
{"check", clusterManagerCommandCheck, -1, "host:port",
"search-multiple-owners"},
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
{"fix", clusterManagerCommandFix, -1, "host:port",
"search-multiple-owners,fix-with-unreachable-masters"},
{"reshard", clusterManagerCommandReshard, -1, "host:port",
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
"replace"},
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
{"add-node", clusterManagerCommandAddNode, 2,
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
{"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
{"call", clusterManagerCommandCall, -2,
"host:port command arg arg .. arg", "only-masters,only-replicas"},
{"set-timeout", clusterManagerCommandSetTimeout, 2,
"host:port milliseconds", NULL},
{"import", clusterManagerCommandImport, 1, "host:port",
"from <arg>,from-user <arg>,from-pass <arg>,from-askpass,copy,replace"},
{"backup", clusterManagerCommandBackup, 2, "host:port backup_directory",
NULL},
{"help", clusterManagerCommandHelp, 0, NULL, NULL}
};
–cluster 命令调用链
// path:src/redis-cli.c
// 将--cluster cmd 和参数保存到全局变量config.cluster_manager_command中
static void createClusterManagerCommand(char *cmdname, int argc, char **argv) {
clusterManagerCommand *cmd = &config.cluster_manager_command;
cmd->name = cmdname;
cmd->argc = argc;
cmd->argv = argc ? argv : NULL;
if (isColorTerm()) cmd->flags |= CLUSTER_MANAGER_CMD_FLAG_COLOR;
}
/*------------------------------------------------------------------------------
* User interface
*--------------------------------------------------------------------------- */
// 解析参数
static int parseOptions(int argc, char **argv) {
int i;
// 循环变量解析 参数
for (i = 1; i < argc; i++) {
int lastarg = i==argc-1;
if (!strcmp(argv[i],"-h") && !lastarg) {
sdsfree(config.hostip);
config.hostip = sdsnew(argv[++i]);
} //....
else if (!strcmp(argv[i],"--cluster") && !lastarg) {
// --cluster 命令,解析cmd 和参数
if (CLUSTER_MANAGER_MODE()) usage();
char *cmd = argv[++i];
int j = i;
while (j < argc && argv[j][0] != '-') j++;
if (j > i) j--;
// 保存cmd 和 参数
createClusterManagerCommand(cmd, j - i, argv + i + 1);
i = j;
} else if (!strcmp(argv[i],"--cluster") && lastarg) {
usage();
} else if ((!strcmp(argv[i],"--cluster-only-masters"))) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_MASTERS_ONLY;
} else if ((!strcmp(argv[i],"--cluster-only-replicas"))) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_SLAVES_ONLY;
} else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) {
// 带了--cluster-replicas, 设置master的副本数
// 保存在config.cluster_manager_command中
config.cluster_manager_command.replicas = atoi(argv[++i]);
} // .....
else {
if (argv[i][0] == '-') {
fprintf(stderr,
"Unrecognized option or bad number of args for: '%s'n",
argv[i]);
exit(1);
} else {
/* Likely the command name, stop here. */
break;
}
}
}
/* --ldb requires --eval. */
if (config.eval_ldb && config.eval == NULL) {
fprintf(stderr,"Options --ldb and --ldb-sync-mode require --eval.n");
fprintf(stderr,"Try %s --help for more information.n", argv[0]);
exit(1);
}
if (!config.no_auth_warning && config.auth != NULL) {
fputs("Warning: Using a password with '-a' or '-u' option on the command"
" line interface may not be safe.n", stderr);
}
return i;
}
// 根据cmd 获取处理函数
static clusterManagerCommandProc *validateClusterManagerCommand(void) {
int i, commands_count = sizeof(clusterManagerCommands) /
sizeof(clusterManagerCommandDef);
clusterManagerCommandProc *proc = NULL;
char *cmdname = config.cluster_manager_command.name;
int argc = config.cluster_manager_command.argc;
// 遍历cluster 命令列表
for (i = 0; i < commands_count; i++) {
clusterManagerCommandDef cmddef = clusterManagerCommands[i];
// 匹配命令
if (!strcmp(cmddef.name, cmdname)) {
// 检测参数数量
if ((cmddef.arity > 0 && argc != cmddef.arity) ||
(cmddef.arity < 0 && argc < (cmddef.arity * -1))) {
fprintf(stderr, "[ERR] Wrong number of arguments for "
"specified --cluster sub commandn");
return NULL;
}
proc = cmddef.proc;
}
}
if (!proc) fprintf(stderr, "Unknown --cluster subcommandn");
return proc;
}
/* Execute redis-cli in Cluster Manager mode */
// 执行--cluster 命令
static void clusterManagerMode(clusterManagerCommandProc *proc) {
int argc = config.cluster_manager_command.argc;
char **argv = config.cluster_manager_command.argv;
cluster_manager.nodes = NULL;
// 调用对应命令处理函数
if (!proc(argc, argv)) goto cluster_manager_err;
//释放资源, 退出
freeClusterManager();
exit(0);
cluster_manager_err:
freeClusterManager();
exit(1);
}
/*------------------------------------------------------------------------------
* Program main()
*--------------------------------------------------------------------------- */
int main(int argc, char **argv) {
int firstarg;
struct timeval tv;
memset(&config.sslconfig, 0, sizeof(config.sslconfig));
// ...
//
config.cluster_manager_command.name = NULL;
config.cluster_manager_command.argc = 0;
config.cluster_manager_command.argv = NULL;
config.cluster_manager_command.flags = 0;
config.cluster_manager_command.replicas = 0;
config.cluster_manager_command.from = NULL;
config.cluster_manager_command.to = NULL;
config.cluster_manager_command.from_user = NULL;
config.cluster_manager_command.from_pass = NULL;
config.cluster_manager_command.from_askpass = 0;
config.cluster_manager_command.weight = NULL;
config.cluster_manager_command.weight_argc = 0;
config.cluster_manager_command.slots = 0;
config.cluster_manager_command.timeout = CLUSTER_MANAGER_MIGRATE_TIMEOUT;
config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE;
config.cluster_manager_command.threshold =
// 解析命令行参数
firstarg = parseOptions(argc,argv);
argc -= firstarg;
argv += firstarg;
// --cluster 命令处理
/* Cluster Manager mode */
if (CLUSTER_MANAGER_MODE()) {
// --cluster cmd
// 根据cmd 获取命令处理函数
clusterManagerCommandProc *proc = validateClusterManagerCommand();
if (!proc) {
exit(1);
}
// 处理命令
clusterManagerMode(proc);
}
// 其他命令处理
}
调用链为:
int main(int argc, char **argv)
- static int parseOptions(int argc, char **argv) // 解析命令参数,保存到全局变量config中
- static clusterManagerCommandProc *validateClusterManagerCommand(void) // 根据cmd 获取处理函数
- static void clusterManagerMode(clusterManagerCommandProc *proc) // 执行proc处理函数
- proc(argc, argv)// 执行 proc, --cluster create 对应clusterManagerCommandCreate函数
–cluster create命令处理
// path:src/redis-cli.c
/* Cluster Manager Commands */
// --cluster create 对应clusterManagerCommandCreate函数
static int clusterManagerCommandCreate(int argc, char **argv) {
int i, j, success = 1;
cluster_manager.nodes = listCreate();
for (i = 0; i < argc; i++) {
// 每个参数都是每个redis实例, ip@port
// 解析参数
char *addr = argv[i];
char *c = strrchr(addr, '@');
if (c != NULL) *c = '';
c = strrchr(addr, ':');
if (c == NULL) {
fprintf(stderr, "Invalid address format: %sn", addr);
return 0;
}
*c = '';
char *ip = addr;
int port = atoi(++c);
// 根据ip端口创建一个clusterManagerNode
clusterManagerNode *node = clusterManagerNewNode(ip, port);
// 链接redis 实例
if (!clusterManagerNodeConnect(node)) {
freeClusterManagerNode(node);
return 0;
}
char *err = NULL;
//判断是否是cluster 模式启动的redis实例
if (!clusterManagerNodeIsCluster(node, &err)) {
clusterManagerPrintNotClusterNodeError(node, err);
if (err) zfree(err);
freeClusterManagerNode(node);
return 0;
}
err = NULL;
// 执行CLUSTER NODES,获取集群信息保存起来
// 新建节点实例一般只有自己的集群信息,并且没有分配slot
if (!clusterManagerNodeLoadInfo(node, 0, &err)) {
if (err) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
zfree(err);
}
freeClusterManagerNode(node);
return 0;
}
err = NULL;
/* Checks whether the node is empty. Node is considered not-empty if it has
* some key or if it already knows other nodes */
// 如果已经有key,或者与其他节点建立了集群关系,视为not-empty
if (!clusterManagerNodeIsEmpty(node, &err)) {
clusterManagerPrintNotEmptyNodeError(node, err);
if (err) zfree(err);
freeClusterManagerNode(node);
return 0;
}
listAddNodeTail(cluster_manager.nodes, node);
} // for 循环结束
int node_len = cluster_manager.nodes->len;
int replicas = config.cluster_manager_command.replicas;
// 根据节点数量 和 每个master的replicas 计算出有多少个master
// #define CLUSTER_MANAGER_MASTERS_COUNT(nodes, replicas) (nodes/(replicas + 1))
int masters_count = CLUSTER_MANAGER_MASTERS_COUNT(node_len, replicas);
// 可以当master的节点数小于3, 不符合建集群
if (masters_count < 3) {
clusterManagerLogErr(
"*** ERROR: Invalid configuration for cluster creation.n"
"*** Redis Cluster requires at least 3 master nodes.n"
"*** This is not possible with %d nodes and %d replicas per node.",
node_len, replicas);
clusterManagerLogErr("n*** At least %d nodes are required.n",
3 * (replicas + 1));
return 0;
}
clusterManagerLogInfo(">>> Performing hash slots allocation "
"on %d nodes...n", node_len);
int interleaved_len = 0, ip_count = 0;
clusterManagerNode **interleaved = zcalloc(node_len*sizeof(**interleaved));
char **ips = zcalloc(node_len * sizeof(char*));
clusterManagerNodeArray *ip_nodes = zcalloc(node_len * sizeof(*ip_nodes));
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
// 遍历redis 实例节点
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
int found = 0;
// 统计 实例ip,保存到ips 中
for (i = 0; i < ip_count; i++) {
char *ip = ips[i];
if (!strcmp(ip, n->ip)) {
found = 1;
break;
}
}
if (!found) {
ips[ip_count++] = n->ip;
}
// ips: index->ip
// ip_nodes: index->node 列表
clusterManagerNodeArray *node_array = &(ip_nodes[i]);
if (node_array->nodes == NULL)
clusterManagerNodeArrayInit(node_array, node_len);
clusterManagerNodeArrayAdd(node_array, n);
}
// 根据ips 统计信息,重新排序node 列表
// 比如 ip1@port1, ip1@port2, ip1@port3, ip2@port1, ip2@port2, ip3@port1
// 排序后列表为: ip1@port1, ip2@port1,ip3@port1,ip1@port2, ip2@port2,ip1@port3
// 即同个ip的node 尽量不相邻
while (interleaved_len < node_len) {
for (i = 0; i < ip_count; i++) {
clusterManagerNodeArray *node_array = &(ip_nodes[i]);
if (node_array->count > 0) {
clusterManagerNode *n = NULL;
clusterManagerNodeArrayShift(node_array, &n);
interleaved[interleaved_len++] = n;
}
}
}
// 获取排在前面的masters_count 为master
clusterManagerNode **masters = interleaved;
// 其他保留作为未分配node 列表(slave)
interleaved += masters_count;
interleaved_len -= masters_count;
// 根据master_count, 分配slot
// #define CLUSTER_MANAGER_SLOTS 16384
float slots_per_node = CLUSTER_MANAGER_SLOTS / (float) masters_count;
long first = 0;
float cursor = 0.0f;
for (i = 0; i < masters_count; i++) {
// 为每个master node 分配slot
clusterManagerNode *master = masters[i];
long last = lround(cursor + slots_per_node - 1);
if (last > CLUSTER_MANAGER_SLOTS || i == (masters_count - 1))
last = CLUSTER_MANAGER_SLOTS - 1;
if (last < first) last = first;
printf("Master[%d] -> Slots %ld - %ldn", i, first, last);
master->slots_count = 0;
for (j = first; j <= last; j++) {
master->slots[j] = 1;
master->slots_count++;
}
master->dirty = 1;
first = last + 1;
cursor += slots_per_node;
}
/* Rotating the list sometimes helps to get better initial
* anti-affinity before the optimizer runs. */
// 未分配node 列表左旋一位,减少master分配slvae时的亲和性
clusterManagerNode *first_node = interleaved[0];
for (i = 0; i < (interleaved_len - 1); i++)
interleaved[i] = interleaved[i + 1];
interleaved[interleaved_len - 1] = first_node;
int assign_unused = 0, available_count = interleaved_len;
assign_replicas:
for (i = 0; i < masters_count; i++) {
clusterManagerNode *master = masters[i];
int assigned_replicas = 0;
// 为每个master 节点分配slave 节点
while (assigned_replicas < replicas) {
if (available_count == 0) break;
clusterManagerNode *found = NULL, *slave = NULL;
int firstNodeIdx = -1;
for (j = 0; j < interleaved_len; j++) {
clusterManagerNode *n = interleaved[j];
if (n == NULL) continue;
// 找到不同ip的未分配节点
if (strcmp(n->ip, master->ip)) {
found = n;
interleaved[j] = NULL;
break;
}
if (firstNodeIdx < 0) firstNodeIdx = j;
}
if (found) slave = found;
// 没找到,取剩余未分配第一个节点作为slave
else if (firstNodeIdx >= 0) {
slave = interleaved[firstNodeIdx];
interleaved_len -= (interleaved - (interleaved + firstNodeIdx));
interleaved += (firstNodeIdx + 1);
}
if (slave != NULL) {
assigned_replicas++;
available_count--;
if (slave->replicate) sdsfree(slave->replicate);
slave->replicate = sdsnew(master->name);
slave->dirty = 1;
} else break;
printf("Adding replica %s:%d to %s:%dn", slave->ip, slave->port,
master->ip, master->port);
// 每个master分配足额slave, 每次每个maste只增加一个slave
if (assign_unused) break;
}
}
// 每个master分配足额slave后, 还有剩余,goto回去继续分配
if (!assign_unused && available_count > 0) {
assign_unused = 1;
printf("Adding extra replicas...n");
goto assign_replicas;
}
// 释放内存
for (i = 0; i < ip_count; i++) {
clusterManagerNodeArray *node_array = ip_nodes + i;
clusterManagerNodeArrayReset(node_array);
}
clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count);
// 显示(打印分配)集群节点情况
clusterManagerShowNodes();
int ignore_force = 0;
// 询问用户是否认可创建集群
if (confirmWithYes("Can I set the above configuration?", ignore_force)) {
// 用户输入yes,确认集群创建
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
// 遍历所有实例节点
clusterManagerNode *node = ln->value;
char *err = NULL;
// 对master 节点分配slot
// 对slave 节点建立主从关系
int flushed = clusterManagerFlushNodeConfig(node, &err);
if (!flushed && node->dirty && !node->replicate) {
// slave 没执行成功,暂时跳过
if (err != NULL) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
zfree(err);
}
success = 0;
goto cleanup;
} else if (err != NULL) zfree(err);
}
clusterManagerLogInfo(">>> Nodes configuration updatedn");
clusterManagerLogInfo(">>> Assign a different config epoch to "
"each noden");
int config_epoch = 1;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
// 遍历redis 实例,调用cluster set-config-epoch
// 设置config-epoch(redis集群配置版本概念)
clusterManagerNode *node = ln->value;
redisReply *reply = NULL;
reply = CLUSTER_MANAGER_COMMAND(node,
"cluster set-config-epoch %d",
config_epoch++);
if (reply != NULL) freeReplyObject(reply);
}
clusterManagerLogInfo(">>> Sending CLUSTER MEET messages to join "
"the clustern");
clusterManagerNode *first = NULL;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
// 遍历集群所有节点
clusterManagerNode *node = ln->value;
if (first == NULL) {
// 保存第一个节点
first = node;
continue;
}
// 向第一个节点发送 cluster meet
// 告诉第一个节点 去与其他每个节点建立集群关系
redisReply *reply = NULL;
reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d",
first->ip, first->port);
int is_err = 0;
if (reply != NULL) {
if ((is_err = reply->type == REDIS_REPLY_ERROR))
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, reply->str);
freeReplyObject(reply);
} else {
is_err = 1;
fprintf(stderr, "Failed to send CLUSTER MEET command.n");
}
if (is_err) {
success = 0;
goto cleanup;
}
}
/* Give one second for the join to start, in order to avoid that
* waiting for cluster join will find all the nodes agree about
* the config as they are still empty with unassigned slots. */
sleep(1);
/* Wait until the cluster configuration is consistent. */
// 等待所有集群节点建立共识,也就是集群状态一致
// 比如那些是master,那些是slvae, slot分配情况
// 底层核心是对每个节点电调用CLUSTER NODES
// 获取到的信息两两匹配一致
// 不一致就一直 sleep(1);,然后调用CLUSTER NODES,继续比较
clusterManagerWaitForClusterJoin();
/* Useful for the replicas */
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;
if (!node->dirty) continue;
char *err = NULL;
// 对之前slave 没执行成功,继续执行,与master建立主从复制
int flushed = clusterManagerFlushNodeConfig(node, &err);
if (!flushed && !node->replicate) {
if (err != NULL) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
zfree(err);
}
success = 0;
goto cleanup;
}
}
// Reset Nodes
listRewind(cluster_manager.nodes, &li);
clusterManagerNode *first_node = NULL;
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;
if (!first_node) first_node = node;
else freeClusterManagerNode(node);
}
listEmpty(cluster_manager.nodes);
// 根据第一个节点获取集群信息
if (!clusterManagerLoadInfoFromNode(first_node, 0)) {
success = 0;
goto cleanup;
}
// 对集群状态做最后检查
// 检查跟节点配置一致性
// 检查是否存在slot没有节点复杂,即open slot问题
clusterManagerCheckCluster(0);
}
cleanup:
/* Free everything */
zfree(masters);
zfree(ips);
for (i = 0; i < node_len; i++) {
clusterManagerNodeArray *node_array = ip_nodes + i;
CLUSTER_MANAGER_NODE_ARRAY_FREE(node_array);
}
zfree(ip_nodes);
return success;
}
// 打印集群节点信息
static void clusterManagerShowNodes(void) {
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;
sds info = clusterManagerNodeInfo(node, 0);
printf("%sn", (char *) info);
sdsfree(info);
}
}
/* Flush the dirty node configuration by calling replicate for slaves or
* adding the slots defined in the masters. */
// 集群节点处理
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) {
if (!node->dirty) return 0;
redisReply *reply = NULL;
int is_err = 0, success = 1;
if (err != NULL) *err = NULL;
if (node->replicate != NULL) {
// 是slave 节点
// 发生CLUSTER REPLICATE 命令 跟master建立主从关系
reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER REPLICATE %s",
node->replicate);
if (reply == NULL || (is_err = (reply->type == REDIS_REPLY_ERROR))) {
if (is_err && err != NULL) {
*err = zmalloc((reply->len + 1) * sizeof(char));
strcpy(*err, reply->str);
}
success = 0;
/* If the cluster did not already joined it is possible that
* the slave does not know the master node yet. So on errors
* we return ASAP leaving the dirty flag set, to flush the
* config later. */
goto cleanup;
}
} else {
// 是master 节点
// 添加负责的 slot信息
// 核心是调用 CLUSTER ADDSLOT
int added = clusterManagerAddSlots(node, err);
if (!added || *err != NULL) success = 0;
}
node->dirty = 0;
cleanup:
if (reply != NULL) freeReplyObject(reply);
return success;
}
clusterManagerCommandCreate整体流程:
-
对每个redis实例建立链接
-
检查是否已经在其它集群中或者已经保存了key
-
根据节点数量和设置的副本数,计算master 数量,并且检查是否少于3个
-
根据ips 统计信息,重新排序node 列表, 同个ip节点尽量不相邻
-
截取排完序的master_count个作为master 节点
-
为master 节点分配slot
-
为master 节点分配对应的slave
-
打印集群节点和slot 分配请求,询问是否安装这个配置创建集群
-
用户同意创建集群,遍历集群节点: 对master 节点分配slot;对slave 节点建立主从关系
-
遍历redis 实例节点,调用cluster set-config-epoch
-
向第一个节点发送 cluster meet, meet 每个节点,即与其他节点建立ping/pong 联系
-
等待所有集群节点建立共识,也就是集群状态一致
-
对之前slave 建立主从关系 失败的节点,继续执行,与master建立主从复制
-
根据第一个节点获取集群信息,检查集群的状态, 包括节点配置一致性 和 open slot 问题
原文始发于微信公众号(吃瓜技术派):Redis 集群创建流程与源码实现剖析
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236065.html