【Redis】集群数据迁移

Redis通过对KEY计算hash,将KEY映射到slot,集群中每个节点负责一部分slot的方式管理数据,slot最大个数为16384。
在集群节点对应的结构体变量clusterNode中可以看到slots数组,数组的大小为CLUSTER_SLOTS除以8,CLUSTER_SLOTS的值是16384:

1
2
3
4
5
6
7
8
9
#define CLUSTER_SLOTS 16384

typedef struct clusterNode {

unsigned char slots[CLUSTER_SLOTS/8];

// 省略...

} clusterNode;

因为一个字符占8位,所以数组个数为16384除以8,每一位可以表示一个slot,如果某一位的值为1,表示当前节点负责这一位对应的slot。

clusterState

clusterNode里面保存了节点相关的信息,集群数据迁移信息并未保存在clusterNode中,而是使用了clusterState结构体来保存:

  • migrating_slots_to数组: 记录当前节点负责的slot迁移到了哪个节点
  • importing_slots_from数组: 记录当前节点负责的slot是从哪个节点迁入的
  • slots数组:记录每个slot是由哪个集群节点负责的
  • slots_keys_count:slot中key的数量
  • slots_to_keys:是一个字典树,记录KEY和SLOT的对应关系
1
2
3
4
5
6
7
8
9
typedef struct clusterState {
clusterNode *myself; /* 当前节点自己 */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
uint64_t slots_keys_count[CLUSTER_SLOTS];
rax *slots_to_keys;
// ...
} clusterState;

clusterState与clusterNode的关系

集群数据迁移

在手动进行数据迁移时,需要执行以下步骤:

  1. 在源节点和目标节点分别使用CLUSTER SETSLOT MIGRATINGCLUSTER SETSLOT IMPORTING标记slot迁出和迁入信息
  2. 在源节点使用CLUSTER GETKEYSINSLOT命令获取待迁出的KEY
  3. 在源节点执行MIGRATE命令进行数据迁移,MIGRATE既支持单个KEY的迁移,也支持多个KEY的迁移
  4. 在源节点和目标节点使用CLUSTER SETSLOT命令标记slot最终迁移节点

标记数据迁移节点

在进行数据迁移之前,首先在需要迁入的目标节点使用SETSLOT命令标记要将SLOT从哪个节点迁入到当前节点:

  • :哈希槽的值
  • :表示slot所在的节点
1
CLUSTER  SETSLOT  <slot>  IMPORTING  <node>

然后在源节点也就是slot所在节点使用MIGRATING命令将数据迁出到目标节点:

  • :哈希槽的值
  • :表示slot要迁出到的目标节点
1
CLUSTER  SETSLOT  <slot>  MIGRATING  <node>

比如slot1当前在node1中,需要将slot1迁出到node2,那么首先在nodd2上执行IMPORTING命令,标记slot准备从node1迁到当前节点node2中:

1
CLUSTER  SETSLOT  slot1  IMPORTING  node1

然后在node1中执行MIGRATING命令标记slot1需要迁移到node2:

1
CLUSTER  SETSLOT  slot1  MIGRATING  node2

clusterCommand
SETSLOT命令的处理在clusterCommand函数(cluster.c文件中)中:

  1. 校验当前节点是否是从节点,如果当前节点是从节点,返回错误,SETSLOT只能用于主节点
  2. 如果是migrating命令,表示slot需要从当前节点迁出到其他节点,处理如下:
    (1) 如果需要迁移的slot不在当前节点,返回错误
    (2)如果要迁移到的目标slot节点未查询到,返回错误
    (3)将当前节点的migrating_slots_to[slot]的值置为迁出到的目标节点,记录slot迁移到了哪个节点
  3. 如果是importing命令,表示slot需要从其他节点迁入到当前节点
    (1)如果要迁移的slot已经在当前节点,返回slot数据已经在当前节点的响应
    (2)由于importing需要从slot所在节点迁移到当前节点,如果未从集群中查询slot当前所在节点,返回错误信息
    (3)将当前节点的importing_slots_from[slot]置为slot所在节点,记录slot是从哪个节点迁入到当前节点的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void clusterCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}

if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
// ...
}
// ...
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { // 处理setslot命令
int slot;
clusterNode *n;
// 如果当前节点是从节点,返回错误,SETSLOT只能用于主节点
if (nodeIsSlave(myself)) {
addReplyError(c,"Please use SETSLOT only with masters.");
return;
}
// 查询slot
if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
// 处理migrating迁出
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
// 如果需要迁移的slot不在当前节点,返回错误
if (server.cluster->slots[slot] != myself) {
addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
return;
}
// 如果要迁移到的目标节点未查询到,返回错误
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
// 将当前节点的migrating_slots_to[slot]置为目标节点,记录slot要迁移到的节点
server.cluster->migrating_slots_to[slot] = n;
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) { // 处理importing迁入
// 如果要迁移的slot已经在当前节点
if (server.cluster->slots[slot] == myself) {
addReplyErrorFormat(c,
"I'm already the owner of hash slot %u",slot);
return;
}
// importing需要从slot所在节点迁移到当前节点,如果未从集群中查询slot当前所在节点,返回错误信息
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
// 记录slot是从哪个节点迁移过来的
server.cluster->importing_slots_from[slot] = n;
}
// 省略其他if else
// ...
else {
addReplyError(c,
"Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
return;
}
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
addReply(c,shared.ok);
}
// ...
else {
addReplySubcommandSyntaxError(c);
return;
}
}

获取待迁出的key

在标记完迁入、迁出节点后,就可以使用CLUSTER GETKEYSINSLOT命令获取待迁出的KEY:

:哈希槽的值

:迁出KEY的数量

1
CLUSTER  GETKEYSINSLOT  <slot>  <count>

getkeysinslot命令的处理也在clusterCommand函数中,处理逻辑如下:

  1. 从命令中解析slot的值以及count的值,count的值记为maxkeys,并校验合法性
  2. 调用countKeysInSlot函数获取slot中key的数量,与maxkeys对比,如果小于maxkeys,就将maxkeys的值更新为slot中key的数量
  3. 根据获取key的个数分配相应的内存空间
  4. 从slot中获取key并将数据返回给客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void clusterCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}

if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
// ...
}
// ...
else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
/* CLUSTER GETKEYSINSLOT <slot> <count> */
long long maxkeys, slot;
unsigned int numkeys, j;
robj **keys;
// 从命令中获取slot的值并转为长整型
if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
return;
// 从命令中获取key的最大个数并转为长整型
if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
!= C_OK)
return;
// 如果slot的值小于0或者大于CLUSTER_SLOTS或者key的最大个数为0
if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
addReplyError(c,"Invalid slot or number of keys");
return;
}

// 计算slot中key的数量
unsigned int keys_in_slot = countKeysInSlot(slot);
// 如果maxkeys大于slot中key的数量,更新maxkeys的值为slot中key的数量
if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
// 分配空间
keys = zmalloc(sizeof(robj*)*maxkeys);
// 从slot中获取key
numkeys = getKeysInSlot(slot, keys, maxkeys);
addReplyArrayLen(c,numkeys);
for (j = 0; j < numkeys; j++) {
// 返回key
addReplyBulk(c,keys[j]);
decrRefCount(keys[j]);
}
zfree(keys);
}
// ...
else {
addReplySubcommandSyntaxError(c);
return;
}
}

数据迁移

源节点数据迁移

完成上两步之后,接下来需要在源节点中执行MIGRATE命令进行数据迁移,MIGRATE既支持单个KEY的迁移,也支持多个KEY的迁移,语法如下:

1
2
3
4
5
# 单个KEY
MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password | AUTH2 username password]

# 多个KEY
MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password | AUTH2 username password] KEYS key2 ... keyN
  • host:ip地址
  • Port:端口
  • key:迁移的key
  • KEYS:如果一次迁移多个KEY,使用KEYS,后跟迁移的key1 … keyN
  • dbid:数据库id
  • COPY:如果目标节点已经存在迁移的key,则报错,如果目标节点不存在迁移的key,则正常进行迁移,在迁移完成后删除源节点中的key
  • REPLACE:如果目标节点不存在迁移的key,正常进行迁移,如果目标节点存在迁移的key,进行替换,覆盖目标节点中已经存在的key
  • AUTH:验证密码

migrateCommand

MIGRATE命令对应的处理函数在migrateCommand中(cluster.c文件中),处理逻辑如下:

  1. 解析命令中的参数,判断是否有replace、auth、keys等参数
    • 如果有replace参数,表示在迁移数据时如果key已经在目标节点存在,进行替换
    • 如果有keys参数,表示命令中有多个key,计算命令中key的个数记为num_keys
  2. 处理命令中解析到的所有key,调用lookupKeyRead函数查找key:
    • 如果查找到,将key放入kv对象中,kv中存储实际要处理的KEY,value放入ov对象中,ov中存储key对应的value
    • 如果未查找到key,跳过当前key,处理下一个key
  3. 因为有部分key可能未查询到,所以更新实际需要处理的key的数量num_keys
  4. 根据命令中的ip端口信息,与目标节点建立连接
  5. 调用rioInitWithBuffer函数初始化一块缓冲区
  6. 处理实际需要迁移的key,主要是将数据填入缓冲区
    • 根据key获取过期时间,如果已过期不进行处理
    • 判断是否开启了集群,如果开启了集群将RESTORE-ASKING写入缓冲区,如果未开启,写入RESTORE命令
    • 将key写入缓冲区
    • 调用createDumpPayload函数,创建payload,将RDB版本、CRC64校验和以及value内容写入 ,目标节点收到数据时需要进行校验
    • 将payload数据填充到缓冲区
  7. 将缓冲区的数据按照64K的块大小发送到目标节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
void migrateCommand(client *c) {

// 省略...

robj **ov = NULL; /* 保存要迁移的key对应的value */
robj **kv = NULL; /* 保存要迁移的key. */

int first_key = 3; /* 第一个key */
int num_keys = 1; /* 迁移key的数量 */

/* 解析命令中的参数 */
for (j = 6; j < c->argc; j++) {
int moreargs = (c->argc-1) - j;
// 如果是copy
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) { // 如果是replace
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) { // 如果需要验证密码
if (!moreargs) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
j++;
// 获取密码
password = c->argv[j]->ptr;
redactClientCommandArgument(c,j);
} else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
// ...
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) { // 如果一次迁移多个key
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
}
// 或取第一个key
first_key = j+1;
// 计算key的数量
num_keys = c->argc - j - 1;
break; /* All the remaining args are keys. */
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}

/* 校验timeout和dbid的值 */
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
{
return;
}
// 如果超时时间小于0,默认设置1000毫秒
if (timeout <= 0) timeout = 1000;

// 分配空间,kv记录在源节点中实际查找到的key
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
int oi = 0;
// 处理KEY
for (j = 0; j < num_keys; j++) {
// 如果可以从源节点查找到key
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
// 记录查找到的key
kv[oi] = c->argv[first_key+j];
// 记录查找到的个数
oi++;
}
}
// 只处理实际查找到的key
num_keys = oi;
// 如果为0,不进行处理
if (num_keys == 0) {
zfree(ov); zfree(kv); // 释放空间
addReplySds(c,sdsnew("+NOKEY\r\n")); // 返回NOKEY响应
return;
}

try_again:
write_error = 0;

/* 与目标节点建立连接 */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
zfree(ov); zfree(kv);
return; /* error sent to the client by migrateGetSocket() */
}
// 初始化缓冲区
rioInitWithBuffer(&cmd,sdsempty());

/* 如果密码不为空,验证密码 */
if (password) {
int arity = username ? 3 : 2;
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
if (username) {
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
sdslen(username)));
}
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
sdslen(password)));
}

// ...

// 处理KEY,只保留未过期的KEY
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
// 获取KEY的过期时间,返回-1表示未设置过期时间,否则返回过期时间
long long expireat = getExpire(c->db,kv[j]);
// 如果设置了过期时间
if (expireat != -1) {
// 计算ttl:过期时间减去当前时间
ttl = expireat-mstime();
// 如果已过期
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}

/* 记录未过期的KEY */
ov[non_expired] = ov[j];
kv[non_expired++] = kv[j];

serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
// 是否启用集群
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); // 将RESTORE-ASKING命令写入缓冲区
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); // 如果未开启集群将RESTORE命令写入缓冲区
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
// 将key写入缓冲区
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr)));
// 将ttl写入缓存区
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));

/* 创建payload,将RDB版本、CRC64校验和以及value内容写入 */
createDumpPayload(&payload,ov[j],kv[j]);
// 将payload数据写入缓冲区
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);

/* 如果设置了REPLACE参数,将REPLACE写入缓冲区 */
if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
}

/* 更新实际需要处理的key */
num_keys = non_expired;

/* 将缓冲区的数据按照64K的块大小发送到目标节点 */
errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;

while ((towrite = sdslen(buf)-pos) > 0) {
// 需要发送的数据,如果超过了64K就按照64K的大小发送
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
// 发送数据
nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten;
}
}

// 省略...
}

createDumpPayload

createDumpPayload函数在cluster.c文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */
void createDumpPayload(rio *payload, robj *o, robj *key) {
unsigned char buf[2];
uint64_t crc;
// 初始化缓冲区
rioInitWithBuffer(payload,sdsempty());
// 将value的数据类型写入缓冲区
serverAssert(rdbSaveObjectType(payload,o));
// 将value写入缓冲区
serverAssert(rdbSaveObject(payload,o,key));

/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
* ----------------+---------------------+---------------+
* RDB version and CRC are both in little endian.
*/

/* 设置RDB版本 */
buf[0] = RDB_VERSION & 0xff;
buf[1] = (RDB_VERSION >> 8) & 0xff;
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);

/* 设置CRC64校验和用于校验数据 */
crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
sdslen(payload->io.buffer.ptr));
memrev64ifbe(&crc);
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
}

目标节点处理数据

restoreCommand

目标节点收到迁移的数据的处理逻辑在restoreCommand中(cluster.c文件中):

  1. 解析请求中的参数,判断是否有replace
  2. 如果没有replace并且key已经在当前节点存在,返回错误信息
  3. 调用verifyDumpPayload函数校验RDB版本和CRC校验和
  4. 从请求中解析value的数据类型和value值
  5. 如果设置了replace先删除数据库中存在的key
  6. 将key和vlaue添加到节点的数据库中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/* RESTORE key ttl serialized-value [REPLACE] */
void restoreCommand(client *c) {
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
rio payload;
int j, type, replace = 0, absttl = 0;
robj *obj;

/* 解析请求中的参数 */
for (j = 4; j < c->argc; j++) {
int additional = c->argc-j-1;
if (!strcasecmp(c->argv[j]->ptr,"replace")) { // 如果有replace
replace = 1; // 标记
}
// ...
else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}

/* 如果没有replace并且key已经在数据库存在,返回错误信息 */
robj *key = c->argv[1];
if (!replace && lookupKeyWrite(c->db,key) != NULL) {
addReplyErrorObject(c,shared.busykeyerr);
return;
}

/* Check if the TTL value makes sense */
if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
return;
} else if (ttl < 0) {
addReplyError(c,"Invalid TTL value, must be >= 0");
return;
}

/* 校验RDB版本和CRC */
if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
{
addReplyError(c,"DUMP payload version or checksum are wrong");
return;
}

rioInitWithBuffer(&payload,c->argv[3]->ptr);
// 解析value的数据类型和value值
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,key->ptr)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}

int deleted = 0;
// 如果设置了replace
if (replace)
deleted = dbDelete(c->db,key); // 先删除数据库中存在的key
if (ttl && !absttl) ttl+=mstime();
if (ttl && checkAlreadyExpired(ttl)) {
if (deleted) {
rewriteClientCommandVector(c,2,shared.del,key);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
server.dirty++;
}
decrRefCount(obj);
addReply(c, shared.ok);
return;
}

/* 将key和vlaue添加到节点的数据库中 */
dbAdd(c->db,key,obj);
if (ttl) {
setExpire(c,c->db,key,ttl);
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
addReply(c,shared.ok);
server.dirty++;
}

标记迁移结果

数据迁移的最后一步,需要使用CLUSTER SETSLOT命令,在源节点和目标节点执行以下命令,标记slot最终所属的节点,并清除第一步中标记的迁移信息

:哈希槽

:哈希槽最终所在节点id

1
CLUSTER SETSLOT <slot> NODE <node>

clusterCommand

CLUSTER SETSLOT <slot> NODE <node>命令的处理依旧在clusterCommand函数中,处理逻辑如下:

  1. 根据命令中传入的nodeid查找节点记为n,如果未查询到,返回错误信息
  2. 果slot已经在当前节点,但是根据nodeid查找到的节点n不是当前节点,说明slot所属节点与命令中指定的节点不一致,返回错误信息
  3. 在源节点上执行命令时,如果slot中key的数量为0,表示slot上的数据都已迁移完毕,而migrating_slots_to[slot]记录了slot迁移到的目标节点,既然数据已经迁移完成此时需要将migrating_slots_to[slot]迁出信息清除
  4. 调用clusterDelSlot函数先将slot删除
    • 获取slot所属节点
    • 将slot所属节点ClusterNode结构体中的slots数组对应的标记位取消,表示节点不再负责此slot
    • 将slot所属节点ClusterState结构体中的slots数组对应元素置为NULL,表示当前slot所属节点为空
  5. 调用clusterAddSlot将slot添加到最终所属的节点中
  6. 在目标节点上执行命令时,如果slot所属节点为当前节点,并且importing_slots_from[slot]不为空, importing_slots_from[slot]中记录了slot是从哪个节点迁移过来,此时数据已经迁移完毕,清除 importing_slots_from[slot]中的迁入信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
void clusterCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}

if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
// ...
}
// ...
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { // 处理setslot命令
int slot;
clusterNode *n;

if (nodeIsSlave(myself)) {
addReplyError(c,"Please use SETSLOT only with masters.");
return;
}

if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
// migrating处理
// ...
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
// importing处理
// ...
} else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
// stable处理
// ...
} else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> 命令处理 */
// 根据nodeid查找节点
clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
// 如果未查询到,返回错误信息
if (!n) {
addReplyErrorFormat(c,"Unknown node %s",
(char*)c->argv[4]->ptr);
return;
}
/* 如果slot已经在当前节点,但是根据node id查找到的节点不是当前节点,返回错误信息*/
if (server.cluster->slots[slot] == myself && n != myself) {
if (countKeysInSlot(slot) != 0) {
addReplyErrorFormat(c,
"Can't assign hashslot %d to a different node "
"while I still hold keys for this hash slot.", slot);
return;
}
}

/* 在源节点上执行命令时 */
/* 如果slot中key的数量为0,表示slot上的数据都已迁移完毕,而migrating_slots_to[slot]记录了slot迁移到的目标节点,既然数据已经迁移完成此时可以将迁移信息清除*/
if (countKeysInSlot(slot) == 0 &&
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;// 清除迁移信息
// 先删除slot
clusterDelSlot(slot);
// 添加slot到节点n
clusterAddSlot(n,slot);

/* 在目标节点上执行命令时 */
/* 如果slot所属节点为当前节点,并且importing_slots_from[slot]不为空, importing_slots_from[slot]中记录了slot是从哪个节点迁移过来*/
if (n == myself &&
server.cluster->importing_slots_from[slot])
{
/* 更新节点的configEpoch */
if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
serverLog(LL_WARNING,
"configEpoch updated after importing slot %d", slot);
}
// 清除importing_slots_from[slot]迁移信息
server.cluster->importing_slots_from[slot] = NULL;
/* 广播PONG消息,让其他节点尽快知道slot的最新信息 */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
}
} else {
addReplyError(c,
"Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
return;
}
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
addReply(c,shared.ok);
}
// ...
else {
addReplySubcommandSyntaxError(c);
return;
}
}

参考

极客时间 - Redis源码剖析与实战(蒋德钧)

Redis版本:redis-6.2.5