【Redis】集群请求命令处理

集群请求命令处理

在Redis的命令处理函数processCommand(server.c)中有对集群节点的处理,满足以下条件时进入集群节点处理逻辑中:

  1. 启用了集群模式,通过server.cluster_enabled判断
  2. 发送命令的节点不是主节点
  3. 收到的命令中包含了key参数或者命令是EXEC,EXEC命令与MULTI结合使用,用于执行事务

条件三的判断条件有些绕,!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0意味着命令中没有key参数,c->cmd->proc != execCommand表示当前命令不是EXEC,然后对(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)整体做了取反操作,那么看以下两种情况:

  • 如果命令中带有Key,那么!cmdHasMovableKeys(c->cmd)就已返回false,又因为对整体做了取反操作,所以条件成立,意味着收到命令中带有Key时需要执行重定向处理
  • 如果收到的命令是EXEC,c->cmd->proc != execCommand返回false,对整体取反变成true,所以条件也成立,意味着收到EXEC命令的时候执行重定向处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
int processCommand(client *c) {

// 省略...

/* 如果启用了集群且发送命令的节点不是主节点,并且收到的命令中包含了key参数或者命令是EXEC时 */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
// 查询节点
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
// 重定向
clusterRedirectClient(c,n,hashslot,error_code);
c->cmd->rejected_calls++;
return C_OK;
}
}

// 省略...

return C_OK;
}


/* 如果参数中有key将会返回1 */
static int cmdHasMovableKeys(struct redisCommand *cmd) {
return (cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
cmd->flags & CMD_MODULE_GETKEYS;
}

MULTI命令的处理

上面说到,如果是EXEC命令时,也会进入到集群节点处理逻辑,EXEC命令一般与MULTI结合使用,用于执行事务。比如以下例子中,使用MULTI开启事务,执行对a账户增1,b账户减1的操作,可以看到返回结果为QUEUED,命令被缓存起来,直到执行EXEC命令,Redis才开始提交命令:

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> INCR a:account
QUEUED
127.0.0.1:6379> DECR b:account
QUEUED
127.0.0.1:6379> EXEC
1) (integer) 1
2) (integer) -1

由于集群也需要对EXEC命令处理,所以先看一下MULTI命令的处理逻辑,MULTI命令对应的执行函数为multiCommand,可以看到它在处理的时候为客户端设置了CLIENT_MULTI标记

1
2
3
4
5
6
7
8
9
10
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
// 设置CLIENT_MULTI标记
c->flags |= CLIENT_MULTI;

addReply(c,shared.ok);
}

在Redis的命令处理函数中可以找到对CLIENT_MULTI的处理逻辑,如果客户端标记中有CLIENT_MULTI,并且当前命令不是EXEC、DISCARD、MULTI、WATCH和RESET,将调用queueMultiCommand函数,对命令进行缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int processCommand(client *c) {

// 省略...

/* 处理MULTI命令 */
/* 如果客户端标记中有CLIENT_MULTI,并且当前命令不是EXEC、DISCARD、MULTI、WATCH和RESET */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
c->cmd->proc != resetCommand)
{
queueMultiCommand(c); // 加入到multi队列中,先将命令缓存
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}

MULTI命令结构体定义

在客户端结构体定义中,可以看到使用了multiState缓存MULTI命令:

1
2
3
4
5
6
// 客户端
typedef struct client {
// ...
multiState mstate; /* 存储MULTI/EXEC命令的结构体 */
// ...
}

multiState

MULTI命令对应的结构体为multiStatemultiState中使用了multiCmd结构体来缓存具体的命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct multiState {
multiCmd *commands; /* MULTI命令数组 */
int count; /* 缓存的命令个数 */
int cmd_flags; /* 命令标记 */
int cmd_inv_flags; /* 与cmd_flags一致 */
} multiState;

/* multi命令 */
typedef struct multiCmd {
robj **argv;
int argc;
struct redisCommand *cmd; /* 命令 */
} multiCmd;

MULTI命令的缓存

queueMultiCommand

对MULTI命令缓存的处理在queueMultiCommand函数中,它在multi.c文件中定义:

  1. multiCmd加入到缓存数组c->mstate.commands中,对命令进行缓存
  2. 将当前命令的内容设置到multiCmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* 将当前命令加入到MULTI命令中 */
void queueMultiCommand(client *c) {
// MULTI命令
multiCmd *mc;
int j;
if (c->flags & CLIENT_DIRTY_EXEC)
return;
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
// 到加入MULTI数组中
mc = c->mstate.commands+c->mstate.count;
// 设置命令
mc->cmd = c->cmd;
// 设置参数
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
// 缓存的命令数加1
c->mstate.count++;
// 设置客户端标记
c->mstate.cmd_flags |= c->cmd->flags;
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}

查询节点

getNodeByQuery

getNodeByQuery函数用于根据KEY查询数据所在的节点,处理逻辑如下:

  1. 如果是EXEC命令,从客户端获取multiState,multiState中缓存了MULTI命令,如果不是MULTI命令,而是单个命令,同样使用multiState来存放命令,之后就可以统一使用multiState来获取请求中的命令

  2. 根据命令的个数进行遍历,处理每一个命令

    (1)从命令中获取key的个数,处理每一个key

    (2)查询每一个key所在的slot

    (3)如果处理的是第一个key,根据所属slot获取所在的节点,记为n,有以下三种情况:

    ​ 情况一:未获取到节点(有可能节点已下线但是还未更新状态),记录错误信息为CLUSTER_REDIR_DOWN_UNBOUND,表示key未绑定到slot,返回NULL

    ​ 情况二:可以查找到节点,并且是当前节点自己,但是key所属slot正在做数据迁出操作(从当前节点迁出),此时将migrating_slot置为1

    ​ 情况三:可以查找到节点,并且不是当前节点自己,但是key所属slot正在迁入到当前节点,此时将importing_slot置为1

    (4)如果处理的不是第一个key,判断当前key所属的slot是否与第一个key的slot一致:

    ​ 情况一:如果不一致,表示不同的key所属的slot不同,将error_code置为CLUSTER_REDIR_CROSS_SLOT,返回NULL

    ​ 情况二:如果一致,将multiple_keys置为1,表示请求中有多个KEY,做一个标记

    (5)根据migrating_slotimporting_slot的值判断key所属slot是否正在迁出或者迁入,迁出意味着key对应的数据正在从当前节点迁出到其他节点,迁入意味着key对应的数据正在迁入到当前节点,由于数据未迁移完毕,所以这两种情况都需要检查key是否在当前节点的数据库中,如果不在意味着当前节点没有该key的数据,需要记录缺失的KEY的数量,missing_keys增1

  3. 根据第二步查询后的结果,进行如下处理:

    • 未查找到节点,也就是n为空,返回当前节点自己

    • 当前节点不处于正常状态(CLUSTER_OK)

      (1)如果未开启allow_reads_when_down(在节点下线时允许读),error_code置为CLUSTER_REDIR_DOWN_STATE,并返回NULL

      (2)当前命令中有写标记,error_code置为CLUSTER_REDIR_DOWN_RO_STATE,并返回NULL

      (3)非以上两种情况,表示开启了allow_reads_when_down,并且是读操作,所以当前节点依旧可以处理请求,继续往下执行

    • 如果数据正在迁出或者正在迁入,并且当前命令是MIGRATE数据迁移的命令,返回当前节点

    • 如果key所在slot数据正在从当前节点迁出,并且当前节点数据库中有缺失的key,error_code置为CLUSTER_REDIR_ASK并返回迁出到的那个节点

    • 如果key所在slot正在迁入到当前节点,并且当前命令是ASK ,此时如果请求中有多个KEY并且当前节点存在缺失的KEY,表示有些key不在当前节点,error_code置为CLUSTER_REDIR_UNSTABLE返回NULL,否则返回当前节点即可

    • 如果客户端有只读标记、 当前命令不是写命令、当前节点是从节点并且它的主节点是根据key所属slot查找到的节点,返回当前节点,因为从节点数据是从master节点同步的,而master节点正是要查找的节点,从节点也可以处理读请求

    • 如果查询到的节点不是当前节点,将error_code置为CLUSTER_REDIR_MOVED,表示数据已经移动到其他节点,此时返回key所属slot对应的实际节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
// 集群节点
clusterNode *n = NULL;
// 记录命令中的第一个KEY
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;

if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return myself;

if (error_code) *error_code = CLUSTER_REDIR_NONE;

/* 如果是EXEC命令 */
if (cmd->proc == execCommand) {
/* 校验是否有CLIENT_MULTI标记 */
if (!(c->flags & CLIENT_MULTI)) return myself;
// 获取multiState
ms = &c->mstate;
} else {
/* 如果不是MULTI命令,而是单个命令,同样使用multiState来存储命令 */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1; // 命令个数设置为1
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd; // 设置命令
}

/* 根据命令的个数进行遍历,处理每一个命令 */
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, *keyindex, numkeys, j;

mcmd = ms->commands[i].cmd; // 获取命令
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;

getKeysResult result = GETKEYS_RESULT_INIT;
// 从命令中获取key的个数
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;
// 遍历每一个key
for (j = 0; j < numkeys; j++) {
// 获取key
robj *thiskey = margv[keyindex[j]];
// 查询key所在的slot
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
// 如果是第一个key
if (firstkey == NULL) {
/* 将第一个key记录在firstkey */
firstkey = thiskey;
// 记录slot
slot = thisslot;
// 根据slot获取集群节点
n = server.cluster->slots[slot];

/* 如果未获取到节点(有可能节点已下线),记录错误信息,返回NULL */
if (n == NULL) {
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}

/* 如果根据slot查到的节点是当前节点自己,并且slot正在做数据迁出操作 */
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1; // migrating_slot置为1,标记正在做数据迁出操作
} else if (server.cluster->importing_slots_from[slot] != NULL) {
// 如果key所属的slot正在做数据迁入操作,importing_slot置为1
importing_slot = 1;
}
} else {
/* 如果不是第一个key*/
if (!equalStringObjects(firstkey,thiskey)) {
// 如果和第一个key的slot不一致,error_code置为CLUSTER_REDIR_CROSS_SLOT
if (slot != thisslot) {

getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT; /* 不同的key所属不同的slot */
return NULL;
} else {
/* 标记请求中有多个KEY */
multiple_keys = 1;
}
}
}

/* 如果slot正在迁入或者迁出,检查key是否在当前节点的db中,如果不在记录缺失的KEY的数量 */
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
{
missing_keys++;
}
}
getKeysFreeResult(&result);
}

/* 如果未查到,返回当前节点自己 */
if (n == NULL) return myself;

/* 如果当前节点的状态不是CLUSTER_OK状态,节点可能处于异常状态,只有在开启了allow_reads_when_down(在节点下线时允许读)并且当前命令是读操作才继续往下处理,否则记录错误信息返回NULL */
if (server.cluster->state != CLUSTER_OK) {
// 如果设置了节点下线时不允许读
if (!server.cluster_allow_reads_when_down) {
/* 记录错误信息,返回NULL */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
} else if (cmd->flags & CMD_WRITE) { // 如果命令中有写标记
/* The cluster is configured to allow read only commands */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
return NULL;
} else {
/* Fall through and allow the command to be executed:
* this happens when server.cluster_allow_reads_when_down is
* true and the command is not a write command */
}
}

/* 更新hashslot */
if (hashslot) *hashslot = slot;

/* 如果数据正在迁出或者正在迁入,并且当前命令是MIGRATE数据迁移的命令,返回当前节点 */
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;

/* 如果key所在slot数据正在迁出,并且当前节点数据库中有缺失的key*/
if (migrating_slot && missing_keys) {
// error_code设置为CLUSTER_REDIR_ASK
if (error_code) *error_code = CLUSTER_REDIR_ASK;
// 返回迁出到的那个节点
return server.cluster->migrating_slots_to[slot];
}

/* 如果key所在slot正在做数据迁入,并且当前命令是ASK */
if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
{
// 如果请求中有多个KEY并且有当前节点数据库中有缺失的key
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
// 返回当前节点
return myself;
}
}

/* 是否是写命令 */
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
// 如果客户端有只读标记、当前命令不是写命令,当前节点是从节点并且它的主节点是根据key所属slot查找到节点
if (c->flags & CLIENT_READONLY &&
!is_write_command &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
// 返回当前节点即可
return myself;
}

/* 如果查询到的节点不是当前节点,将error_code置为CLUSTER_REDIR_MOVED,返回key所属slot对应的实际节点 */
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
}

根据Key从DB中查询Value

redisDb

Redis数据库对应的结构体定义为redisDb,里面有个字典类型的对象,存储键值对数据:

1
2
3
4
typedef struct redisDb {
dict *dict; /* 存储的键值对数据 */
// 省略...
} redisDb;

lookupKeyRead

lookupKeyRead函数用于从redisDb中根据key查找数据,最终是调用lookupKey函数完成的,根据Key从字典中查找并返回value:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
robj *lookupKeyRead(redisDb *db, robj *key) {
// 调用lookupKeyReadWithFlags查找
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}

robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
expireIfNeeded(db,key);
// 调用lookupKey函数查找
return lookupKey(db,key,flags);
}

robj *lookupKey(redisDb *db, robj *key, int flags) {
// 根据KEY从字典中进行查找
dictEntry *de = dictFind(db->dict,key->ptr);
// 如果不为空
if (de) {
robj *val = dictGetVal(de);
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
// 返回value
return val;
} else {
return NULL;
}
}

集群重定向

clusterRedirectClient

clusterRedirectClient用于集群重定向处理,在getNodeByQuery函数中,根据查询节点的情况对error_code设置了不同的值,在clusterRedirectClient函数中可以看到对error_code的判断,根据error_code的不同,向客户端响应不同的内容:

  1. 如果error_code是CLUSTER_REDIR_CROSS_SLOT,表示请求中有多个KEY,但是KEY所属slot不在同一个slot中
  2. 如果error_code是CLUSTER_REDIR_UNSTABLE,表示请求中有多个KEY并且在一个slot,但是数据可能正在迁入或迁出的过程中,节点中有缺失的KEY,slot处于一个不稳定的状态
  3. 如果error_code是CLUSTER_REDIR_DOWN_STATE,表示节点处于下线状态
  4. 如果error_code是CLUSTER_REDIR_DOWN_RO_STATE,表示节点处于下线状态,只接收读命令
  5. 如果error_code是CLUSTER_REDIR_DOWN_UNBOUND,标识key未绑定到节点,也就是根据key所属slot未查询到节点
  6. 如果error_code是CLUSTER_REDIR_MOVED或者CLUSTER_REDIR_ASK
    • CLUSTER_REDIR_MOVED表示key所属slot已从当前节点迁出,此时向客户端响应MOVED命令并将迁出后slot以及所在节点ip和端口返回
    • CLUSTER_REDIR_ASK表示key所属slot正在从当前节点迁出的过程中,请求中的key有可能一部分还未迁出,一部分已经迁出完毕,此时向客户端返回ASK命令,并将slot以及迁出到的目标节点的ip和端口返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
// 如果是CLUSTER_REDIR_CROSS_SLOT,向客户端回复key不在同一个slot中
addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
} else if (error_code == CLUSTER_REDIR_UNSTABLE) {
/* 请求中有多个key并且在一个slot,但是数据可能正在迁入或迁出的过程中,slot并不稳定 */
addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
// 节点处于下线状态
addReplyError(c,"-CLUSTERDOWN The cluster is down");
} else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
// 节点已经下线只接收读命令
addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
} else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
// 如果是CLUSTER_REDIR_DOWN_UNBOUND,表示根据key所属slot未查询到节点
addReplyError(c,"-CLUSTERDOWN Hash slot not served");
} else if (error_code == CLUSTER_REDIR_MOVED ||
error_code == CLUSTER_REDIR_ASK)
{
/* 如果是MOVED或者ASK,需要进行请求重定向处理,向客户端返回ASK或者MOVED命令,并将目标节点的ip和端口返回 */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
int port = use_pport && n->pport ? n->pport : n->port;
// 返回响应,包括ASK或者MOVED命令、slot信息、目标节点的ip端口
addReplyErrorSds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d",
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot, n->ip, port));
} else {
serverPanic("getNodeByQuery() unknown error.");
}
}

参考

极客时间 - Redis源码剖析与实战(蒋德钧)
zhaiguanjie-Redis源码剖析

Redis版本:redis-6.2.5