集群

集群是Redis提供的分布式数据库方案,通过分片(sharding)来进行数据共享,并提供复制和故障转移功能

数据结构

// 集群状态信息
typedef struct clusterState {
    // 指向当前节点的指针
    clusterNode *myself;  /* This node */
    // 集群的当前配置纪元,用于故障转移
    uint64_t currentEpoch;
    // 集群当前状态(上线/下线)
    int state;            /* CLUSTER_OK, CLUSTER_FAIL, ... */
    // 集群中至少处理一个槽的节点的数量
    int size;             /* Num of master nodes with at least one slot */
    // 集群节点名称
    dict *nodes;          /* Hash table of name -> clusterNode structures */
    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
    // 迁移槽的状态
    clusterNode *migrating_slots_to[CLUSTER_SLOTS];
    clusterNode *importing_slots_from[CLUSTER_SLOTS];
    clusterNode *slots[CLUSTER_SLOTS];
    uint64_t slots_keys_count[CLUSTER_SLOTS];
    rax *slots_to_keys;
    /* The following fields are used to take the slave state on elections. */
    mstime_t failover_auth_time; /* Time of previous or next election. */
    int failover_auth_count;    /* Number of votes received so far. */
    int failover_auth_sent;     /* True if we already asked for votes. */
    int failover_auth_rank;     /* This slave rank for current auth request. */
    uint64_t failover_auth_epoch; /* Epoch of the current election. */
    int cant_failover_reason;   /* Why a slave is currently not able to
                                   failover. See the CANT_FAILOVER_* macros. */
    /* Manual failover state in common. */
    mstime_t mf_end;            /* Manual failover time limit (ms unixtime).
                                   It is zero if there is no MF in progress. */
    /* Manual failover state of master. */
    clusterNode *mf_slave;      /* Slave performing the manual failover. */
    /* Manual failover state of slave. */
    long long mf_master_offset; /* Master offset the slave needs to start MF
                                   or zero if stil not received. */
    int mf_can_start;           /* If non-zero signal that the manual failover
                                   can start requesting masters vote. */
    /* The followign fields are used by masters to take state on elections. */
    uint64_t lastVoteEpoch;     /* Epoch of the last vote granted. */
    int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
    /* Messages received and sent by type. */
    long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
    long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
    long long stats_pfail_nodes;    /* Number of nodes in PFAIL status,
                                       excluding nodes without address. */
} clusterState;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 记录节点的状态
typedef struct clusterNode {
    // 节点创建时间
    mstime_t ctime; /* Node object creation time. */
    // 节点的名字,40个16进制字符组成
    char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
    // 节点标识,记录角色(主从),状态(上下线)
    int flags;      /* CLUSTER_NODE_... */
    // 配置纪元,用于故障转移
    uint64_t configEpoch; /* Last configEpoch observed for this node */
    // 处理的槽,二进制位数组,16384/8=2048
    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
    // 处理的槽数量
    int numslots;   /* Number of slots handled by this node */
    // 从节点数量(如果是主节点)
    int numslaves;  /* Number of slave nodes, if this is a master */
    // 从节点数组指针
    struct clusterNode **slaves; /* pointers to slave nodes */
    // 主节点指针
    struct clusterNode *slaveof; /* pointer to the master node. Note that it
                                    may be NULL even if the node is a slave
                                    if we don't have the master node in our
                                    tables. */
    // 各类事件的时间(ping/pong/fail/voted/repl_offset/orphaned)
    mstime_t ping_sent;      /* Unix time we sent latest ping */
    mstime_t pong_received;  /* Unix time we received the pong */
    mstime_t fail_time;      /* Unix time when FAIL flag was set */
    mstime_t voted_time;     /* Last time we voted for a slave of this master */
    mstime_t repl_offset_time;  /* Unix time we received offset for this node */
    mstime_t orphaned_time;     /* Starting time of orphaned master condition */
    long long repl_offset;      /* Last known repl offset for this node. */
    // 节点的IP和端口
    char ip[NET_IP_STR_LEN];  /* Latest known IP address of this node */
    int port;                   /* Latest known clients port of this node */
    int cport;                  /* Latest known cluster port of this node. */
    // 连接节点
    clusterLink *link;          /* TCP/IP link with this node */
    list *fail_reports;         /* List of nodes signaling this as failing */
} clusterNode;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 保存连接节点需要的信息
typedef struct clusterLink {
    // 连接创建时间
    mstime_t ctime;             /* Link creation time */
    // TCP 套接字描述符
    int fd;                     /* TCP socket file descriptor */
    // 输出缓冲区
    sds sndbuf;                 /* Packet send buffer */
    // 输入缓冲区
    sds rcvbuf;                 /* Packet reception buffer */
    // 与这个连接相关联的节点
    struct clusterNode *node;   /* Node related to this link if any, or NULL */
} clusterLink;
1
2
3
4
5
6
7
8
9
10
11
12
13

集群通信

CLUSTER MEET命令

客户端向节点A发送CLUSTER MEET命令,将节点B加入到节点A的集群。节点A会与节点B进行握手(handshake),来确认彼此的存在

CLUSTER MEET <ip> <port>
1
  1. 节点A为节点B创建一个clusterNode结构,并将该结构加入到自己的clusterState.nodes字典里面
  2. 节点A向节点B发送一条MEET消息
  3. 节点B接收到节点A发送的MEET消息,为节点A创建一个clusterNode结构,并将该结构加入到自己的clusterState.nodes字典里面
  4. 节点B向节点A返回一条PONG消息
  5. 节点A收到节点B返回的PONG消息
  6. 节点A向节点B返回一条PING消息
  7. 节点B收到节点A发送的PING消息,握手完成
  8. 节点A将节点B的消息通过Gossip协议传播给集群中的其他节点,让其他节点与节点B进行握手

槽指派

集群整个数据库被分为16384个槽(slot),每个键都属于这16384个槽的其中一个,每个节点可以处理0个或最多16384个槽。所有16384个槽都有节点在处理时,集群处于上线状态(ok),反之,集群处于下线状态(fail)

使用CLUSTER ADDSLOTS命令指派槽

CLUSTER ADDSLOTS <slot> [slot ...]
1

slots属性是一个二进制位数组(bit array),这个数组长度为16384/8=2048个字节。slots数组在索引i上的二进制位的值为1表示该节点负责处理该槽

一个节点会将自己负责处理的槽记录在clusterNode结构的slots属性和numslots属性,还会将自己的slots数组通过消息发送给集群中的其他节点

clusterState结构中的slots记录了集群中所有16384个槽的指派信息,查找某个槽被指派给了哪个节点的时间复杂度为O(1)

集群命令

// 集群(cluster)
// 打印集群的信息
cluster info
// 列出集群当前已知的所有节点(node),以及这些节点的相关信息
cluster nodes

// 节点(node)
// 将ip和port所指定的节点添加到集群当中,让它成为集群的一份子
cluster meet <ip> <port>
// 从集群中移除node_id指定的节点
cluster forget <node_id>
// 将当前从节点设置为node_id指定的master节点的slave节点(s只能针对slave节点操作)
cluster replicate <master_node_id>
// 将节点的配置文件保存到硬盘里面
cluster saveconfig

// 槽(slot)
// 将一个或多个槽(slot)指派(assign)给当前节点
cluster addslots <slot> [slot ...]
// 移除一个或多个槽对当前节点的指派
cluster delslots <slot> [slot ...]
// 移除指派给当前节点的所有槽,让当前节点变成一个没有指派任何槽的节点
cluster flushslots
// 将槽slot指派给node_id指定的节点,如果槽已经指派给另一个节点,那么先让另一个节点删除该槽,然后再进行指派
cluster setslot <slot> node <node_id>
// 将本节点的槽slot迁移到node_id指定的节点中
cluster setslot <slot> migrating <node_id>
// 从node_id指定的节点中导入槽slot到本节点
cluster setslot <slot> importing <node_id>
// 取消对槽slot的导入(import)或者迁移(migrate)
cluster setslot <slot> stable

// 键(key)
// 计算键key应该被放置在哪个槽上
cluster keyslot <key>
// 返回槽slot目前包含的键值对数量
cluster countkeysinslot <slot>
// 返回count个slot槽中的键
cluster getkeysinslot <slot> <count>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39