Redis 集群创建流程与源码实现剖析

面对大数据量和高并发场景,Redis集群是一种可扩展和高可用的部署模式。那么,Redis集群是如何创建的呢?本文将从源码级别详细剖析集群的创建过程。

搭建redis集群

首先介绍下如何搭建redis集群

前置准备

  1. 下载redis源码:

git clone git@github.com:redis/redis.git
cd redis
git checkout v6
  1. 编译源码

make 

编译链接后,生成一些可执行文件:
redis-server, redis服务端可执行文件
redis-cli, redis客户端,用户链接redis实例执行命令

创建集群

1. 启动Redis实例

cd utils/create-cluster 
./create-cluster start

运行命令后,脚本启动6个redis节点

Starting 30001 #redis实例监听端口个
Starting 30002
Starting 30003
Starting 30004
Starting 30005
Starting 30006

同时可以在create-cluster目录下看到不同实例的集群配置文件(nodes-****.conf),日志文件,和aof 文件

30001.log
30002.log
30003.log
30004.log
30005.log
30006.log
appendonly-30001.aof
appendonly-30002.aof
appendonly-30003.aof
appendonly-30004.aof
appendonly-30005.aof
appendonly-30006.aof
nodes-30001.conf
nodes-30002.conf
nodes-30003.conf
nodes-30004.conf
nodes-30005.conf
nodes-30006.conf


2.创建集群

./create-cluster create

运行命令后,便将上一步运行的实例组成3主3从的redis集群:

redis-cli -p 30001
127.0.0.1:30001cluster nodes
3385e2a8bf971f9e4fb04b2045174d84517ddd51 127.0.0.1:30003@40003 master - 0 1691231147062 3 connected 10923-16383
6d2dca34675d5457e994df9c4cb7b4197cd5848c 127.0.0.1:30005@40005 slave d0b63c70738d6dffc48a4bee7448abb702479b2e 0 1691231147164 1 connected
d0b63c70738d6dffc48a4bee7448abb702479b2e 127.0.0.1:30001@40001 myself,master - 0 1691231147000 1 connected 0-5460
4f4ac20ec2a7982b684292fdacff7403e16a5d6a 127.0.0.1:30002@40002 master - 0 1691231146960 2 connected 5461-10922
f653d09280f7ac019b4a923c967f547c74eb7164 127.0.0.1:30004@40004 slave 3385e2a8bf971f9e4fb04b2045174d84517ddd51 0 1691231147062 3 connected
4aa656d1423c5c9352f0c03c3aa2b9619b247d5a 127.0.0.1:30006@40006 slave 4f4ac20ec2a7982b684292fdacff7403e16a5d6a 0 1691231147164 2 connected

综上,redis集群的创建流程:

  1. create-cluster start 创建redis 实例

  2. create-cluster create 对第一步创建的实例列表建立集群关系

create-cluster脚本剖析

#!/bin/bash

# Settings 
# 参数配置

# redis可执行文件所在文件夹
BIN_PATH="../../src/"

# 启动redis时 监听的ip
CLUSTER_HOST=127.0.0.1

# 启动redis时 监听端口(第一个实例)
PORT=30000

# redis集群ping/pong 超时时间
TIMEOUT=2000

# ./create-cluster start时创建的实例个数
NODES=6

# ./create-cluster create 创建集群时, 每个master有多少个从节点
REPLICAS=1

PROTECTED_MODE=yes
ADDITIONAL_OPTIONS=""

# You may want to put the above config parameters into config.sh in order to
# override the defaults without modifying this script.

# 覆盖默认参数, 可以在脚本同目录下创建config.sh覆盖
if [ -a config.sh ]
then
    source "config.sh"
fi

# Computed vars
# 根据实例个数计算出 截止监听端口
# 用于start 命令 停止循环
ENDPORT=$((PORT+NODES))

# start 命令
if [ "$1" == "start" ]
then
    while [ $((PORT < ENDPORT)) != "0" ]; do
        PORT=$((PORT+1))
        echo "Starting $PORT"
        # 使用redis-server 二进制逐个启动 redis 实例
        $BIN_PATH/redis-server --port $PORT  --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes ${ADDITIONAL_OPTIONS}
    done
    exit 0
fi

# 创建集群命令
if [ "$1" == "create" ]
then
    HOSTS=""
    # 拼接所有实例的ip 端口
    while [ $((PORT < ENDPORT)) != "0" ]; do
        PORT=$((PORT+1))
        HOSTS="$HOSTS $CLUSTER_HOST:$PORT"
    done
    OPT_ARG=""
    if [ "$2" == "-f" ]; then
        OPT_ARG="--cluster-yes"
    fi
    # 使用redis-cli --cluster create 创建集群
    $BIN_PATH/redis-cli --cluster create $HOSTS --cluster-replicas $REPLICAS $OPT_ARG
    exit 0
fi

# 停止实例运行
if [ "$1" == "stop" ]
then
    while [ $((PORT < ENDPORT)) != "0" ]; do
        PORT=$((PORT+1))
        echo "Stopping $PORT"
        # 逐个实例 调用shutdown nosave 停止运行
        $BIN_PATH/redis-cli -p $PORT shutdown nosave
    done
    exit 0
fi

# watch redis集群节点信息
if [ "$1" == "watch" ]
then
    PORT=$((PORT+1))
    while [ 1 ]; do
        clear
        date
        # 调用第一个节点 cluster nodes命令获取集群信息
        $BIN_PATH/redis-cli -p $PORT cluster nodes | head -30
        sleep 1
    done
    exit 0
fi

# 查看对应redis实例日志
if [ "$1" == "tail" ]
then
    INSTANCE=$2
    PORT=$((PORT+INSTANCE))
    tail -f ${PORT}.log
    exit 0
fi

# 查看所有实例日志
if [ "$1" == "tailall" ]
then
    tail -f *.log
    exit 0
fi

# 对所有实例运行明个命令
if [ "$1" == "call" ]
then
    while [ $((PORT < ENDPORT)) != "0" ]; do
        PORT=$((PORT+1))
        $BIN_PATH/redis-cli -p $PORT $2 $3 $4 $5 $6 $7 $8 $9
    done
    exit 0
fi

# 清除日志文件,配置文件,aof文件
if [ "$1" == "clean" ]
then
    rm -rf *.log
    rm -rf appendonly*.aof
    rm -rf dump*.rdb
    rm -rf nodes*.conf
    exit 0
fi


# 清除日志文件
if [ "$1" == "clean-logs" ]
then
    rm -rf *.log
    exit 0
fi

echo "Usage: $0 [start|create|stop|watch|tail|clean|call]"
echo "start       -- Launch Redis Cluster instances."
echo "create [-f] -- Create a cluster using redis-cli --cluster create."
echo "stop        -- Stop Redis Cluster instances."
echo "watch       -- Show CLUSTER NODES output (first 30 lines) of first node."
echo "tail <id>   -- Run tail -f of instance at base port + ID."
echo "tailall     -- Run tail -f for all the log files at once."
echo "clean       -- Remove all instances data, logs, configs."
echo "clean-logs  -- Remove just instances logs."
echo "call <cmd>  -- Call a command (up to 7 arguments) on all nodes."

分析create-cluster脚本,得出以下结论:

  • start, 实际上直接使用redis-server运行多个redis实例

  • create, 调用redis-cli –cluster create 创建集群

redis-cli –cluster create 源码剖析

redis源码版本:6.2.5

–cluster 命令处理函数定义

// path:src/redis-cli.c
// 命令定义结构
typedef struct clusterManagerCommandDef {
    // 命令名字
    char *name;
    // 命令处理函数
    clusterManagerCommandProc *proc;
    // 命令参数个数
    // <0,至少要-arity个
    // >0,等于arity个
    int arity;
    // 必带参数
    char *args;
    // 可选参数提示
    char *options;
} clusterManagerCommandDef;

// 不同--cluster 的命令定义
clusterManagerCommandDef clusterManagerCommands[] = {
    // create的处理函数为 clusterManagerCommandCreate,至少需要两个参数
    {"create", clusterManagerCommandCreate, -2"host1:port1 ... hostN:portN",
     "replicas <arg>"},
    {"check", clusterManagerCommandCheck, -1"host:port",
     "search-multiple-owners"},
    {"info", clusterManagerCommandInfo, -1"host:port"NULL},
    {"fix", clusterManagerCommandFix, -1"host:port",
     "search-multiple-owners,fix-with-unreachable-masters"},
    {"reshard", clusterManagerCommandReshard, -1"host:port",
     "from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
     "replace"},
    {"rebalance", clusterManagerCommandRebalance, -1"host:port",
     "weight <node1=w1...nodeN=wN>,use-empty-masters,"
     "timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
    {"add-node", clusterManagerCommandAddNode, 2,
     "new_host:new_port existing_host:existing_port""slave,master-id <arg>"},
    {"del-node", clusterManagerCommandDeleteNode, 2"host:port node_id",NULL},
    {"call", clusterManagerCommandCall, -2,
        "host:port command arg arg .. arg""only-masters,only-replicas"},
    {"set-timeout", clusterManagerCommandSetTimeout, 2,
     "host:port milliseconds"NULL},
    {"import", clusterManagerCommandImport, 1"host:port",
     "from <arg>,from-user <arg>,from-pass <arg>,from-askpass,copy,replace"},
    {"backup", clusterManagerCommandBackup, 2,  "host:port backup_directory",
     NULL},
    {"help", clusterManagerCommandHelp, 0NULLNULL}
};

–cluster 命令调用链

// path:src/redis-cli.c

// 将--cluster cmd 和参数保存到全局变量config.cluster_manager_command中
static void createClusterManagerCommand(char *cmdname, int argc, char **argv) {
    clusterManagerCommand *cmd = &config.cluster_manager_command;
    cmd->name = cmdname;
    cmd->argc = argc;
    cmd->argv = argc ? argv : NULL;
    if (isColorTerm()) cmd->flags |= CLUSTER_MANAGER_CMD_FLAG_COLOR;
}

/*------------------------------------------------------------------------------
 * User interface
 *--------------------------------------------------------------------------- */

// 解析参数
static int parseOptions(int argc, char **argv) {
    int i;

    // 循环变量解析 参数
    for (i = 1; i < argc; i++) {
        int lastarg = i==argc-1;
        if (!strcmp(argv[i],"-h") && !lastarg) {
            sdsfree(config.hostip);
            config.hostip = sdsnew(argv[++i]);
        } //....
        else if (!strcmp(argv[i],"--cluster") && !lastarg) {
            // --cluster 命令,解析cmd 和参数
            if (CLUSTER_MANAGER_MODE()) usage();
            char *cmd = argv[++i];
            int j = i;
            while (j < argc && argv[j][0] != '-') j++;
            if (j > i) j--;
            // 保存cmd 和 参数
            createClusterManagerCommand(cmd, j - i, argv + i + 1);
            i = j;
        } else if (!strcmp(argv[i],"--cluster") && lastarg) {
            usage();
        } else if ((!strcmp(argv[i],"--cluster-only-masters"))) {
            config.cluster_manager_command.flags |=
                    CLUSTER_MANAGER_CMD_FLAG_MASTERS_ONLY;
        } else if ((!strcmp(argv[i],"--cluster-only-replicas"))) {
            config.cluster_manager_command.flags |=
                    CLUSTER_MANAGER_CMD_FLAG_SLAVES_ONLY;
        } else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) {
            // 带了--cluster-replicas, 设置master的副本数
            // 保存在config.cluster_manager_command中
            config.cluster_manager_command.replicas = atoi(argv[++i]);
        } // ..... 
        else {
            if (argv[i][0] == '-') {
                fprintf(stderr,
                    "Unrecognized option or bad number of args for: '%s'n",
                    argv[i]);
                exit(1);
            } else {
                /* Likely the command name, stop here. */
                break;
            }
        }
    }

    /* --ldb requires --eval. */
    if (config.eval_ldb && config.eval == NULL) {
        fprintf(stderr,"Options --ldb and --ldb-sync-mode require --eval.n");
        fprintf(stderr,"Try %s --help for more information.n", argv[0]);
        exit(1);
    }

    if (!config.no_auth_warning && config.auth != NULL) {
        fputs("Warning: Using a password with '-a' or '-u' option on the command"
              " line interface may not be safe.n"stderr);
    }

    return i;
}


// 根据cmd 获取处理函数
static clusterManagerCommandProc *validateClusterManagerCommand(void) {
    int i, commands_count = sizeof(clusterManagerCommands) /
                            sizeof(clusterManagerCommandDef);
    clusterManagerCommandProc *proc = NULL;
    char *cmdname = config.cluster_manager_command.name;
    int argc = config.cluster_manager_command.argc;
    // 遍历cluster 命令列表
    for (i = 0; i < commands_count; i++) {
        clusterManagerCommandDef cmddef = clusterManagerCommands[i];
        // 匹配命令
        if (!strcmp(cmddef.name, cmdname)) {
            // 检测参数数量
            if ((cmddef.arity > 0 && argc != cmddef.arity) ||
                (cmddef.arity < 0 && argc < (cmddef.arity * -1))) {
                fprintf(stderr"[ERR] Wrong number of arguments for "
                                "specified --cluster sub commandn");
                return NULL;
            }
            proc = cmddef.proc;
        }
    }
    if (!proc) fprintf(stderr"Unknown --cluster subcommandn");
    return proc;
}


/* Execute redis-cli in Cluster Manager mode */
// 执行--cluster 命令
static void clusterManagerMode(clusterManagerCommandProc *proc) {
    int argc = config.cluster_manager_command.argc;
    char **argv = config.cluster_manager_command.argv;
    cluster_manager.nodes = NULL;
    // 调用对应命令处理函数
    if (!proc(argc, argv)) goto cluster_manager_err;
    //释放资源, 退出
    freeClusterManager();
    exit(0);
cluster_manager_err:
    freeClusterManager();
    exit(1);
}



/*------------------------------------------------------------------------------
 * Program main()
 *--------------------------------------------------------------------------- */


int main(int argc, char **argv) {
    int firstarg;
    struct timeval tv;

    memset(&config.sslconfig, 0sizeof(config.sslconfig));
    // ...
    // 
    config.cluster_manager_command.name = NULL;
    config.cluster_manager_command.argc = 0;
    config.cluster_manager_command.argv = NULL;
    config.cluster_manager_command.flags = 0;
    config.cluster_manager_command.replicas = 0;
    config.cluster_manager_command.from = NULL;
    config.cluster_manager_command.to = NULL;
    config.cluster_manager_command.from_user = NULL;
    config.cluster_manager_command.from_pass = NULL;
    config.cluster_manager_command.from_askpass = 0;
    config.cluster_manager_command.weight = NULL;
    config.cluster_manager_command.weight_argc = 0;
    config.cluster_manager_command.slots = 0;
    config.cluster_manager_command.timeout = CLUSTER_MANAGER_MIGRATE_TIMEOUT;
    config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE;
    config.cluster_manager_command.threshold =

    // 解析命令行参数
    firstarg = parseOptions(argc,argv);
    argc -= firstarg;
    argv += firstarg;

    // --cluster 命令处理
    /* Cluster Manager mode */
    if (CLUSTER_MANAGER_MODE()) {
        // --cluster cmd
        // 根据cmd 获取命令处理函数
        clusterManagerCommandProc *proc = validateClusterManagerCommand();
        if (!proc) {
            exit(1);
        }
        // 处理命令
        clusterManagerMode(proc);
    }

    // 其他命令处理
}

调用链为:

int main(int argc, char **argv)
    - static int parseOptions(int argc, char **argv) // 解析命令参数,保存到全局变量config中
    - static clusterManagerCommandProc *validateClusterManagerCommand(void) // 根据cmd 获取处理函数
    - static void clusterManagerMode(clusterManagerCommandProc *proc) // 执行proc处理函数
        - proc(argc, argv)// 执行 proc, --cluster create 对应clusterManagerCommandCreate函数

–cluster create命令处理

// path:src/redis-cli.c
/* Cluster Manager Commands */
// --cluster create 对应clusterManagerCommandCreate函数
static int clusterManagerCommandCreate(int argc, char **argv) {
    int i, j, success = 1;
    cluster_manager.nodes = listCreate();
    for (i = 0; i < argc; i++) {
        // 每个参数都是每个redis实例, ip@port
        // 解析参数
        char *addr = argv[i];
        char *c = strrchr(addr, '@');
        if (c != NULL) *c = '';
        c = strrchr(addr, ':');
        if (c == NULL) {
            fprintf(stderr"Invalid address format: %sn", addr);
            return 0;
        }
        *c = '';
        char *ip = addr;
        int port = atoi(++c);
        // 根据ip端口创建一个clusterManagerNode
        clusterManagerNode *node = clusterManagerNewNode(ip, port);
        // 链接redis 实例
        if (!clusterManagerNodeConnect(node)) {
            freeClusterManagerNode(node);
            return 0;
        }
        char *err = NULL;
        //判断是否是cluster 模式启动的redis实例
        if (!clusterManagerNodeIsCluster(node, &err)) {
            clusterManagerPrintNotClusterNodeError(node, err);
            if (err) zfree(err);
            freeClusterManagerNode(node);
            return 0;
        }
        err = NULL;
        // 执行CLUSTER NODES,获取集群信息保存起来
        // 新建节点实例一般只有自己的集群信息,并且没有分配slot
        if (!clusterManagerNodeLoadInfo(node, 0, &err)) {
            if (err) {
                CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
                zfree(err);
            }
            freeClusterManagerNode(node);
            return 0;
        }
        err = NULL;
        /* Checks whether the node is empty. Node is considered not-empty if it has
        * some key or if it already knows other nodes */

        // 如果已经有key,或者与其他节点建立了集群关系,视为not-empty
        if (!clusterManagerNodeIsEmpty(node, &err)) {
            clusterManagerPrintNotEmptyNodeError(node, err);
            if (err) zfree(err);
            freeClusterManagerNode(node);
            return 0;
        }
        listAddNodeTail(cluster_manager.nodes, node);
    } // for 循环结束

    int node_len = cluster_manager.nodes->len;
    int replicas = config.cluster_manager_command.replicas;
    // 根据节点数量 和 每个master的replicas 计算出有多少个master
    // #define CLUSTER_MANAGER_MASTERS_COUNT(nodes, replicas) (nodes/(replicas + 1))
    int masters_count = CLUSTER_MANAGER_MASTERS_COUNT(node_len, replicas);
    // 可以当master的节点数小于3, 不符合建集群
    if (masters_count < 3) {
        clusterManagerLogErr(
            "*** ERROR: Invalid configuration for cluster creation.n"
            "*** Redis Cluster requires at least 3 master nodes.n"
            "*** This is not possible with %d nodes and %d replicas per node.",
            node_len, replicas);
        clusterManagerLogErr("n*** At least %d nodes are required.n",
                             3 * (replicas + 1));
        return 0;
    }
    clusterManagerLogInfo(">>> Performing hash slots allocation "
                          "on %d nodes...n", node_len);
    int interleaved_len = 0, ip_count = 0;
    clusterManagerNode **interleaved = zcalloc(node_len*sizeof(**interleaved));
    char **ips = zcalloc(node_len * sizeof(char*));
    clusterManagerNodeArray *ip_nodes = zcalloc(node_len * sizeof(*ip_nodes));
    listIter li;
    listNode *ln;
    listRewind(cluster_manager.nodes, &li);
    // 遍历redis 实例节点
    while ((ln = listNext(&li)) != NULL) {
        clusterManagerNode *n = ln->value;
        int found = 0;
        // 统计 实例ip,保存到ips 中
        for (i = 0; i < ip_count; i++) {
            char *ip = ips[i];
            if (!strcmp(ip, n->ip)) {
                found = 1;
                break;
            }
        }
        if (!found) {
            ips[ip_count++] = n->ip;
        }
        // ips: index->ip
        // ip_nodes: index->node 列表
        clusterManagerNodeArray *node_array = &(ip_nodes[i]);
        if (node_array->nodes == NULL)
            clusterManagerNodeArrayInit(node_array, node_len);
        clusterManagerNodeArrayAdd(node_array, n);
    }
    // 根据ips 统计信息,重新排序node 列表
    // 比如 ip1@port1, ip1@port2, ip1@port3, ip2@port1, ip2@port2, ip3@port1
    // 排序后列表为: ip1@port1, ip2@port1,ip3@port1,ip1@port2, ip2@port2,ip1@port3
    // 即同个ip的node 尽量不相邻
    while (interleaved_len < node_len) {
        for (i = 0; i < ip_count; i++) {
            clusterManagerNodeArray *node_array = &(ip_nodes[i]);
            if (node_array->count > 0) {
                clusterManagerNode *n = NULL;
                clusterManagerNodeArrayShift(node_array, &n);
                interleaved[interleaved_len++] = n;
            }
        }
    }
    // 获取排在前面的masters_count 为master
    clusterManagerNode **masters = interleaved;
    // 其他保留作为未分配node 列表(slave)
    interleaved += masters_count;
    interleaved_len -= masters_count;
    // 根据master_count, 分配slot
    // #define CLUSTER_MANAGER_SLOTS 16384
    float slots_per_node = CLUSTER_MANAGER_SLOTS / (float) masters_count;
    long first = 0;
    float cursor = 0.0f;
    for (i = 0; i < masters_count; i++) {
        // 为每个master node 分配slot
        clusterManagerNode *master = masters[i];
        long last = lround(cursor + slots_per_node - 1);
        if (last > CLUSTER_MANAGER_SLOTS || i == (masters_count - 1))
            last = CLUSTER_MANAGER_SLOTS - 1;
        if (last < first) last = first;
        printf("Master[%d] -> Slots %ld - %ldn", i, first, last);
        master->slots_count = 0;
        for (j = first; j <= last; j++) {
            master->slots[j] = 1;
            master->slots_count++;
        }
        master->dirty = 1;
        first = last + 1;
        cursor += slots_per_node;
    }

    /* Rotating the list sometimes helps to get better initial
     * anti-affinity before the optimizer runs. */

     // 未分配node 列表左旋一位,减少master分配slvae时的亲和性
    clusterManagerNode *first_node = interleaved[0];
    for (i = 0; i < (interleaved_len - 1); i++)
        interleaved[i] = interleaved[i + 1];
    interleaved[interleaved_len - 1] = first_node;
    int assign_unused = 0, available_count = interleaved_len;
assign_replicas:
    for (i = 0; i < masters_count; i++) {
        clusterManagerNode *master = masters[i];
        int assigned_replicas = 0;
        // 为每个master 节点分配slave 节点
        while (assigned_replicas < replicas) {
            if (available_count == 0break;
            clusterManagerNode *found = NULL, *slave = NULL;
            int firstNodeIdx = -1;
            for (j = 0; j < interleaved_len; j++) {
                clusterManagerNode *n = interleaved[j];
                if (n == NULLcontinue;
                // 找到不同ip的未分配节点
                if (strcmp(n->ip, master->ip)) {
                    found = n;
                    interleaved[j] = NULL;
                    break;
                }
                if (firstNodeIdx < 0) firstNodeIdx = j;
            }
            if (found) slave = found;
            // 没找到,取剩余未分配第一个节点作为slave
            else if (firstNodeIdx >= 0) {
                slave = interleaved[firstNodeIdx];
                interleaved_len -= (interleaved - (interleaved + firstNodeIdx));
                interleaved += (firstNodeIdx + 1);
            }
            if (slave != NULL) {
                assigned_replicas++;
                available_count--;
                if (slave->replicate) sdsfree(slave->replicate);
                slave->replicate = sdsnew(master->name);
                slave->dirty = 1;
            } else break;
            printf("Adding replica %s:%d to %s:%dn", slave->ip, slave->port,
                   master->ip, master->port);
            // 每个master分配足额slave, 每次每个maste只增加一个slave
            if (assign_unused) break;
        }
    }
    // 每个master分配足额slave后, 还有剩余,goto回去继续分配
    if (!assign_unused && available_count > 0) {
        assign_unused = 1;
        printf("Adding extra replicas...n");
        goto assign_replicas;
    }
    // 释放内存
    for (i = 0; i < ip_count; i++) {
        clusterManagerNodeArray *node_array = ip_nodes + i;
        clusterManagerNodeArrayReset(node_array);
    }
    clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count);

    // 显示(打印分配)集群节点情况
    clusterManagerShowNodes();
    int ignore_force = 0;
    // 询问用户是否认可创建集群
    if (confirmWithYes("Can I set the above configuration?", ignore_force)) {
        // 用户输入yes,确认集群创建
        listRewind(cluster_manager.nodes, &li);
        while ((ln = listNext(&li)) != NULL) {
            // 遍历所有实例节点
            clusterManagerNode *node = ln->value;
            char *err = NULL;
            // 对master 节点分配slot
            // 对slave 节点建立主从关系
            int flushed = clusterManagerFlushNodeConfig(node, &err);
            if (!flushed && node->dirty && !node->replicate) {
                // slave 没执行成功,暂时跳过
                if (err != NULL) {
                    CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
                    zfree(err);
                }
                success = 0;
                goto cleanup;
            } else if (err != NULL) zfree(err);
        }
        clusterManagerLogInfo(">>> Nodes configuration updatedn");
        clusterManagerLogInfo(">>> Assign a different config epoch to "
                              "each noden");
        int config_epoch = 1;
        listRewind(cluster_manager.nodes, &li);
        while ((ln = listNext(&li)) != NULL) {
        // 遍历redis 实例,调用cluster set-config-epoch
        // 设置config-epoch(redis集群配置版本概念)
            clusterManagerNode *node = ln->value;
            redisReply *reply = NULL;
            reply = CLUSTER_MANAGER_COMMAND(node,
                                            "cluster set-config-epoch %d",
                                            config_epoch++);
            if (reply != NULL) freeReplyObject(reply);
        }
        clusterManagerLogInfo(">>> Sending CLUSTER MEET messages to join "
                              "the clustern");
        clusterManagerNode *first = NULL;
        listRewind(cluster_manager.nodes, &li);
        while ((ln = listNext(&li)) != NULL) {
            // 遍历集群所有节点
            clusterManagerNode *node = ln->value;
            if (first == NULL) {
                // 保存第一个节点
                first = node;
                continue;
            }
            // 向第一个节点发送 cluster meet
            // 告诉第一个节点 去与其他每个节点建立集群关系
            redisReply *reply = NULL;
            reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d",
                                            first->ip, first->port);
            int is_err = 0;
            if (reply != NULL) {
                if ((is_err = reply->type == REDIS_REPLY_ERROR))
                    CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, reply->str);
                freeReplyObject(reply);
            } else {
                is_err = 1;
                fprintf(stderr"Failed to send CLUSTER MEET command.n");
            }
            if (is_err) {
                success = 0;
                goto cleanup;
            }
        }
        /* Give one second for the join to start, in order to avoid that
         * waiting for cluster join will find all the nodes agree about
         * the config as they are still empty with unassigned slots. */

        sleep(1);
        /* Wait until the cluster configuration is consistent. */
        // 等待所有集群节点建立共识,也就是集群状态一致
        // 比如那些是master,那些是slvae, slot分配情况
        // 底层核心是对每个节点电调用CLUSTER NODES
        // 获取到的信息两两匹配一致
        // 不一致就一直 sleep(1);,然后调用CLUSTER NODES,继续比较
        clusterManagerWaitForClusterJoin();
        /* Useful for the replicas */
        listRewind(cluster_manager.nodes, &li);
        while ((ln = listNext(&li)) != NULL) {
            clusterManagerNode *node = ln->value;
            if (!node->dirty) continue;
            char *err = NULL;
            // 对之前slave 没执行成功,继续执行,与master建立主从复制
            int flushed = clusterManagerFlushNodeConfig(node, &err);
            if (!flushed && !node->replicate) {
                if (err != NULL) {
                    CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
                    zfree(err);
                }
                success = 0;
                goto cleanup;
            }
        }
        // Reset Nodes
        listRewind(cluster_manager.nodes, &li);
        clusterManagerNode *first_node = NULL;
        while ((ln = listNext(&li)) != NULL) {
            clusterManagerNode *node = ln->value;
            if (!first_node) first_node = node;
            else freeClusterManagerNode(node);
        }
        listEmpty(cluster_manager.nodes);
        // 根据第一个节点获取集群信息
        if (!clusterManagerLoadInfoFromNode(first_node, 0)) {
            success = 0;
            goto cleanup;
        }
        // 对集群状态做最后检查
        // 检查跟节点配置一致性
        // 检查是否存在slot没有节点复杂,即open slot问题
        clusterManagerCheckCluster(0);
    }
cleanup:
    /* Free everything */
    zfree(masters);
    zfree(ips);
    for (i = 0; i < node_len; i++) {
        clusterManagerNodeArray *node_array = ip_nodes + i;
        CLUSTER_MANAGER_NODE_ARRAY_FREE(node_array);
    }
    zfree(ip_nodes);
    return success;
}

// 打印集群节点信息
static void clusterManagerShowNodes(void) {
    listIter li;
    listNode *ln;
    listRewind(cluster_manager.nodes, &li);
    while ((ln = listNext(&li)) != NULL) {
        clusterManagerNode *node = ln->value;
        sds info = clusterManagerNodeInfo(node, 0);
        printf("%sn", (char *) info);
        sdsfree(info);
    }
}

/* Flush the dirty node configuration by calling replicate for slaves or
* adding the slots defined in the masters. */

// 集群节点处理
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) {
    if (!node->dirty) return 0;
    redisReply *reply = NULL;
    int is_err = 0, success = 1;
    if (err != NULL) *err = NULL;
    if (node->replicate != NULL) {
        // 是slave 节点
        // 发生CLUSTER REPLICATE 命令 跟master建立主从关系
        reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER REPLICATE %s",
                                        node->replicate);
        if (reply == NULL || (is_err = (reply->type == REDIS_REPLY_ERROR))) {
            if (is_err && err != NULL) {
                *err = zmalloc((reply->len + 1) * sizeof(char));
                strcpy(*err, reply->str);
            }
            success = 0;
            /* If the cluster did not already joined it is possible that
             * the slave does not know the master node yet. So on errors
             * we return ASAP leaving the dirty flag set, to flush the
             * config later. */

            goto cleanup;
        }
    } else {
        // 是master 节点
        // 添加负责的 slot信息
        // 核心是调用 CLUSTER ADDSLOT
        int added = clusterManagerAddSlots(node, err);
        if (!added || *err != NULL) success = 0;
    }
    node->dirty = 0;
cleanup:
    if (reply != NULL) freeReplyObject(reply);
    return success;
}

clusterManagerCommandCreate整体流程:

  1. 对每个redis实例建立链接

  2. 检查是否已经在其它集群中或者已经保存了key

  3. 根据节点数量和设置的副本数,计算master 数量,并且检查是否少于3个

  4. 根据ips 统计信息,重新排序node 列表, 同个ip节点尽量不相邻

  5. 截取排完序的master_count个作为master 节点

  6. 为master 节点分配slot

  7. 为master 节点分配对应的slave

  8. 打印集群节点和slot 分配请求,询问是否安装这个配置创建集群

  9. 用户同意创建集群,遍历集群节点: 对master 节点分配slot;对slave 节点建立主从关系

  10. 遍历redis 实例节点,调用cluster set-config-epoch

  11. 向第一个节点发送 cluster meet, meet 每个节点,即与其他节点建立ping/pong 联系

  12. 等待所有集群节点建立共识,也就是集群状态一致

  13. 对之前slave 建立主从关系 失败的节点,继续执行,与master建立主从复制

  14. 根据第一个节点获取集群信息,检查集群的状态, 包括节点配置一致性 和 open slot 问题

原文始发于微信公众号(吃瓜技术派):Redis 集群创建流程与源码实现剖析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236065.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!