【Redis】事件驱动源码分析(多线程)

IO线程初始化

Redis在6.0版本中引入了多线程,提高IO请求处理效率。

在Redis Server启动函数main(server.c文件)中初始化服务之后,又调用了InitServerLast函数:

1
2
3
4
5
6
7
8
9
10
11
12
int main(int argc, char **argv) {
// ...
// 初始化服务
initServer();
// ...
// InitServerLast
InitServerLast();
// ...
// 事件循环
aeMain(server.el);
// ...
}

InitServerLast函数在server.c文件中,它调用了initThreadedIO函数对IO线程初始化:

1
2
3
4
5
6
7
void InitServerLast() {
bioInit();
// 初始化IO线程
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}

initThreadedIO

initThreadedIO的实现在networking.c文件中:

  1. 初始化全局变量 server.io_threads_active线程活跃状态为0,表示未激活IO多线程
  2. 对server.io_threads_num的值进行判断,io_threads_num表示设置的IO线程数量
    • 如果线程数设置为1,表示不开启多线程直接返回即可
    • 如果线程数超过了IO_THREADS_MAX_NUM设置的最大值(128),则报错并停止redis服务
  3. 根据线程数的设置创建线程
    • 初始化io_threads_list[i],io_threads_list是一个数组,数组中的每一个元素是一个list,里面存储每个线程要处理的客户端列表下标为0的元素也就是io_threads_lis[0]存储的是主线程要处理的客户端列表,这里先调用listCreate创建列表,为io_threads_list[i]初始化
    • 初始化io_threads_pending[i]为0,io_threads_pending数组存储每个线程等待处理的客户端个数
    • 调用pthread_create创建线程,并传入了线程的运行函数IOThreadMain,之后将线程保存在io_threads中,io_threads数组存储了创建的线程描述符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

/* 初始化线程 */
void initThreadedIO(void) {
server.io_threads_active = 0; /* 初始化线程活跃状态为0,表示未激活IO多线程 */

/* 如果IO线程数为1,直接返回即可 */
if (server.io_threads_num == 1) return;
/* 如果IO线程数超过了最大限制,打印错误,停止redis服务 */
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}

/* 根据线程数设置创建线程 */
for (int i = 0; i < server.io_threads_num; i++) {
/* 创建List */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* 下标为0的存储的是主线程 */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
// 初始化待处理的客户端数量为0
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
// 创建线程, 线程的运行函数为IOThreadMain
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
/* 将创建的线程加入io_threads线程组中*/
io_threads[i] = tid;
}
}

// setIOPendingCount在networking.c文件
static inline void setIOPendingCount(int i, unsigned long count) {
// 设置io_threads_pending[i]的值为count
atomicSetWithSync(io_threads_pending[i], count);
}

io_threads_list

1
2
/* io_threads_list存储每个线程要处理的客户端 */
list *io_threads_list[IO_THREADS_MAX_NUM];

io_threads

1
2
/* 存储创建的线程*/
pthread_t io_threads[IO_THREADS_MAX_NUM];

io_threads_pending

1
2
/* 存储每个线程要等待处理的客户端个数 */
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];

IO_THREADS_MAX_NUM定义

1
#define IO_THREADS_MAX_NUM 128

初始化流程图

IO线程运行函数

IO线程运行函数IOThreadMain在networking.c文件中,函数的入参传入的是线程id,它开启了一个while(1)循环,主要处理逻辑如下:

  1. 从io_threads_list数组中获取当前线程id要处理的客户端列表,放入到列表迭代器li中
  2. 遍历迭代器,获取每一个待处理的客户端client,根据io_threads_op线程的操作状态判断读写状态
    • 如果是写状态,调用调用writeToClient处理
    • 如果是读状态,调用readQueryFromClient处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void *IOThreadMain(void *myid) {
/* myid是线程ID,从0开始,到 server.iothreads_num-1,0号线程存储的是主线程 */
long id = (unsigned long)myid;
char thdname[16];

snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
// 循环
while(1) {
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}

serverAssert(getIOPendingCount(id) != 0);

listIter li;
listNode *ln;
// 获取每一个IO线程要处理的客户端,将其放入到迭代器li,这里的id指的线程id
listRewind(io_threads_list[id],&li);
// 遍历列表
while((ln = listNext(&li))) {
// 获取每一个待处理的客户端
client *c = listNodeValue(ln);
// 如果是写事件
if (io_threads_op == IO_THREADS_OP_WRITE) {
// 调用writeToClient处理
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
// 如果是读事件,调用readQueryFromClient
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
// 处理完毕后,io_threads_pending数组中对应的数量设置为0,表示所有客户端已处理完毕
setIOPendingCount(id, 0);
}
}

io_threads_list数组中存储了每一个线程要处理的客户端列表,在线程运行函数IOThreadMain中,获取待处理的客户端列表,遍历每一个客户端,根据读写类型调用不同的方法进行处理,接下来就去看下Redis在何时将待处理的客户端加入到io_threads_list列表中的。

延迟读写操作

Redis在处理客户端读事件和写事件时会根据一定条件推迟客户端的读取操作或者往客户端写数据操作,将待处理的读客户端和待处理的写客户端分别加入到全局变量server的clients_pending_read和clients_pending_write列表中,全局变量server对应的结构体为redisServer:

全局变量server定义,在server.c文件:

1
2
/* 全局变量server */
struct redisServer server;

redisServer的结构体定义在server.h中:

1
2
3
4
5
6
struct redisServer {

list *clients_pending_write; /* list类型,记录延迟写回数据的客户端 */
list *clients_pending_read; /* list类型,记录延迟读取数据的客户端*/
// 省略...
}

推迟客户端读操作

readQueryFromClient

readQueryFromClient主要处理从客户端读取数据,在networking.c中实现,里面调用了postponeClientRead函数判断是否需要推迟客户端的读取操作 :

1
2
3
4
5
6
7
8
9
10
11
12
13
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;

/* 判断是否需要推迟客户端的读取操作 */
if (postponeClientRead(c)) return;

// 省略...

// 处理数据执行命令
processInputBuffer(c);
}

postponeClientRead

postponeClientRead函数用于判断是否延迟从客户端读取数据,包含四个条件:

  1. server.io_threads_active为1,表示激活了IO多线程
  2. server.io_threads_do_reads为1,表示IO多线程可以延迟执行客户端的读取操作,在配置文件中定义,可以通过修改配置文件来开启延迟读取客户端数据
  3. ProcessingEventsWhileBlocked值为0,processEventsWhileBlokced函数在执行时会将ProcessingEventsWhileBlocked的值置为1,执行完毕后置为0,Redis在读取RDB或者AOF文件时会调用processEventsWhileBlokced函数,为了避免读取RDB或AOF文件时阻塞无法及时处理请求,processEventsWhileBlokced函数在执行时不能推迟客户端数据读取。
  4. 客户端的现有标识不能有CLIENT_MASTER、CLIENT_SLAVE、CLIENT_PENDING_READ、CLIENT_BLOCKED等状态
    • CLIENT_MASTER、CLIENT_SLAVE表示是用于主从复制的客户端
    • CLIENT_PENDING_READ表示客户端本身已经是推迟读取状态
    • CLIENT_BLOCKED表示客户端是阻塞状态

满足以上四个条件时将推迟从客户端读取数据,会将客户端标识置为CLIENT_PENDING_READ延迟读状态,并将待读取数据的客户端client加入到server.clients_pending_read中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
{
c->flags |= CLIENT_PENDING_READ;
// 将客户端加入到clients_pending_read链表中
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}

推迟客户端写操作

在往客户端写数据的addReply(networking.c)函数中,调用了prepareClientToWrite判断是否准备往客户端写数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void addReply(client *c, robj *obj) {
// 调用prepareClientToWrite往客户端写数据
if (prepareClientToWrite(c) != C_OK) return;

if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}

prepareClientToWrite

prepareClientToWrite(networking.c)中,首先对客户端标识状态进行了一系列的判断,然后调用了clientHasPendingReplies函数判断输出缓冲区是否有还有数据等待写回到客户端,如果没有,判断客户端的标识是否是CLIENT_PENDING_READ已延迟读,如果不是CLIENT_PENDING_READ状态,调用clientInstallWriteHandler处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int prepareClientToWrite(client *c) {
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;

if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;

if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;

if ((c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;

if (!c->conn) return C_ERR;

/*
* 如果缓冲区的数据都已写回到客户端并且客户端标识不是推迟读状态
*/
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
clientInstallWriteHandler(c);// 调用clientInstallWriteHandler

return C_OK;
}

clientInstallWriteHandler

clientInstallWriteHandler(networking.c)函数中对是否推迟客户端写操作进行了判断

  1. 客户端标识不是CLIENT_PENDING_WRITE,对应条件为!(c->flags & CLIENT_PENDING_WRITE),表示客户端本身不是推迟写状态
  2. 客户端未在进行主从复制(对应条件为c->replstate == REPL_STATE_NONE) 或者 客户端是主从复制的从节点,但全量复制的 RDB 文件已经传输完成,客户端可以接收请求(对应条件 !c->repl_put_online_on_ack))

满足以上两个条件时将推迟客户端写操作,将客户端的标识置为延迟写CLIENT_PENDING_WRITE状态,并将客户端加入到待写回的列表server.clients_pending_write中。

1
2
3
4
5
6
7
8
9
10
11
12
13

void clientInstallWriteHandler(client *c) {
/* 如果客户端的标识不是推迟写状态,并且客户端未在进行主从复制或者客户端是主从复制的从节点并能接收请求 */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* 将客户端的标识置为延迟写 */
c->flags |= CLIENT_PENDING_WRITE;
// 将客户端加入到待写回的列表clients_pending_write中
listAddNodeHead(server.clients_pending_write,c);
}
}

IO线程的分配

上面我们已经知道了IO线程的初始化、IO线程的运行函数IOThreadMain主要处理逻辑,以及延迟读写的客户端是何时分别加入到server全局变量的clients_pending_read和clients_pending_write中的,接下来去看下时何时为客户端分配线程。

在aeProcessEvents处理事件的函数中,等待事件产生之前,调用了beforeSleep(networking.c)方法,beforeSleep中又调用了handleClientsWithPendingReadsUsingThreads为延迟读取操作的客户端分配线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);

// 省略...

handleBlockedClientsTimeout();

/* 调用了handleClientsWithPendingReadsUsingThreads为延迟读客户端分配线程 */
handleClientsWithPendingReadsUsingThreads();

// 省略...

/* 调用了handleClientsWithPendingWritesUsingThreads为延迟写客户端分配线程 */
handleClientsWithPendingWritesUsingThreads();

// 省略...
}

延迟读操作的客户端分配线程

handleClientsWithPendingReadsUsingThreads

handleClientsWithPendingReadsUsingThreads(networking.c)主要逻辑如下:

  1. 从server.clients_pending_read获取延迟读取操作的客户端,将其加入到迭代列表

  2. 遍历延迟读操作的客户端列表,获取每一个待处理的客户端client,item_id表示每个客户端的序号,从0开始,每处理一个客户端就增1,用序号对线程数server.io_threads_num取模,得到一个target_id,客户端会被加入到io_threads_list[target_id]对应的列表中,也就是使用取模的方式轮询为每一个客户端分配对应线程,然后将客户端加入到该线程待处理的客户端列表中,此时客户端已分配到线程,在线程的运行函数IOThreadMain会调用readQueryFromClient处理客户端数据,需要注意多线程只是从客户端数据读取数据解析命令,并不会执行命令,在processInputBuffer中可以看到在IO多线程下只会将flags状态标记为CLIENT_PENDING_COMMAND,不会执行processCommandAndResetClient函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
     void processInputBuffer(client *c) {
    while(c->qb_pos < sdslen(c->querybuf)) {
    // 省略...
    if (c->argc == 0) {
    resetClient(c);
    } else {
    /* 在IO多线程情况下不能在这里执行命令,所以在这里将client标记为CLIENT_PENDING_COMMAND然后返回,等待主线程同步执行命令 */
    if (c->flags & CLIENT_PENDING_READ) {
    c->flags |= CLIENT_PENDING_COMMAND;
    break;
    }
    /* 准备执行命令 */
    if (processCommandAndResetClient(c) == C_ERR) {
    return;
    }
    }
    }
    // 省略...
    }
  3. 将io_threads_op线程操作状态置为读操作

  4. 遍历线程数,获取每一个线程要处理的客户端个数,将其设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程等待处理的客户端个数

  5. 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据,因为当前执行handleClientsWithPendingReadsUsingThreads函数的线程正是主线程,所以让主线程来处理io_threads_list[0]中存放的待处理客户端

  6. 主线程遍历io_threads_list[0]中每一个待处理的客户端,调用readQueryFromClient处理,从客户端读取数据

  7. 主线程开启一个while(1)循环等待其他IO线程处理完毕,结束条件是pending为0,pending记录了所有线程要处理的客户端数量总和,在前面IOThreadMain函数中可以看到线程在处理完毕之后会将对应io_threads_pending数组中记录的个数置为0,当pending为0表示所有的线程都已将各自复制的客户端数据处理完毕

  8. 主线程开启while循环准备执行客户端命令(注意这里才开始执行命令,多线程只负责解析不负责执行),循环条件是server.clients_pending_read列表的长度不为0,主线程需要保证客户端的请求顺序,所从clients_pending_read列表中的第一个元素开始向后遍历:

    (1)调用listNodeValue获取列表中的元素,也就是待处理的客户端client

    (2)调用listDelNode将获取到的元素从列表删除,因为在第7步中,主线程已经等待其他所有的线程执行完毕,此时所有的线程已经将各自负责的客户端数据处理完成,所以可以将客户端从server.clients_pending_read中移除

    (3)调用processPendingCommandsAndResetClient函数判断客户端标识是否是CLIENT_PENDING_COMMAND状态,CLIENT_PENDING_COMMAND状态表示客户端的请求命令已经被IO线程解析(processInputBuffer方法中可以看到状态被标记为CLIENT_PENDING_COMMAND),可以开始执行命令,接着调用processCommandAndResetClient函数执行客户端发送的请求命令

    (4)由于客户端输入缓冲区可能有其他的命令未读取,这里调用processInputBuffer处理输入缓冲区数据继续解析命令并执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;

listIter li;
listNode *ln;
// 获取待读取的客户端列表clients_pending_read加入到迭代链表中
listRewind(server.clients_pending_read,&li);
int item_id = 0;
// 遍历待读取的客户端
while((ln = listNext(&li))) {
// 获取客户端
client *c = listNodeValue(ln);
// 根据线程数取模,轮询分配线程
int target_id = item_id % server.io_threads_num;
// 分配线程,加入到线程对应的io_threads_list
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

/* 将线程的操作状态置为读操作*/
io_threads_op = IO_THREADS_OP_READ;
// 遍历线程数
for (int j = 1; j < server.io_threads_num; j++) {
// 获取每个线程待处理客户端的个数
int count = listLength(io_threads_list[j]);
// 将待处理客户端的个数设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程要处理的客户端个数
setIOPendingCount(j, count);
}

/* 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据*/
/* handleClientsWithPendingReadsUsingThreads函数的执行者刚好就是主线程,所以让主线程处理io_threads_list[0]中的数据*/
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 调用readQueryFromClient
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);

/* 等待其他线程处理完毕 */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
// 获取每一个客户端处理的客户端个数
pending += getIOPendingCount(j);
// 如果为0表示所有线程对应的客户端都处理完毕
if (pending == 0) break;
}

/* 再次判断server.clients_pending_read是否有待处理的客户端*/
while(listLength(server.clients_pending_read)) {
// 获取列表第一个元素
ln = listFirst(server.clients_pending_read);
// 获取客户端
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
// 删除节点
listDelNode(server.clients_pending_read,ln);

serverAssert(!(c->flags & CLIENT_BLOCKED));
// processPendingCommandsAndResetClient函数中会判断客户端标识是否是CLIENT_PENDING_COMMAND状态,如果是调用processCommandAndResetClient函数处理请求命令
if (processPendingCommandsAndResetClient(c) == C_ERR) {
continue;
}
// 由于客户端输入缓冲区可能有其他的命令未读取,这里解析命令并执行
processInputBuffer(c);

if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
clientInstallWriteHandler(c);
}

/* Update processed count on server */
server.stat_io_reads_processed += processed;

return processed;
}

processPendingCommandsAndResetClient

processPendingCommandsAndResetClient函数在networking.c中,它先判断客户端标识是否是CLIENT_PENDING_COMMAND状态,CLIENT_PENDING_COMMAND状态表示客户端的请求命令已经被IO线程解析,可以被执行,所以如果处于CLIENT_PENDING_COMMAND状态,接下来会调用processCommandAndResetClient函数处理客户端命令,具体是调用processCommand函数执行命令的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*processPendingCommandsAndResetClient函数(networking.c中) */
int processPendingCommandsAndResetClient(client *c) {
// 判断客户端标识是否是CLIENT_PENDING_COMMAND
if (c->flags & CLIENT_PENDING_COMMAND) {
// 取消CLIENT_PENDING_COMMAND状态
c->flags &= ~CLIENT_PENDING_COMMAND;
// 调用processCommandAndResetClient执行命令
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
}
return C_OK;
}

/* processCommandAndResetClient函数(networking.c中) */
int processCommandAndResetClient(client *c) {
int deadclient = 0;
client *old_client = server.current_client;
server.current_client = c;
// 调用processCommand执行命令
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = old_client;
return deadclient ? C_ERR : C_OK;
}

processCommand

processCommand函数在server.c文件中,它调用了addReply函数将需要返回给客户端的数据先写入缓冲区:

1
2
3
4
5
6
7
8
9
10
11
12
int processCommand(client *c) {
// 省略...

if (!strcasecmp(c->argv[0]->ptr,"quit")) {
// 调用addReply函数将需要返回给客户端的数据先写入缓冲区
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}

// 省略...
}

数据读取的整体过程如下,IO多线程只是负责从客户端读取数据解析命令,执行命令的过程仍然是单线程的

延迟写操作的客户端分配线程

handleClientsWithPendingWritesUsingThreads

延迟写操作的客户端分配线程在handleClientsWithPendingWritesUsingThreads中实现(networking.c),处理逻辑与handleClientsWithPendingReadsUsingThreads类似:

  1. 从server.clients_pending_write获取延迟写操作的客户端,将其加入到迭代列表

  2. 遍历延迟写操作的客户端列表,获取每一个待处理的客户端client,使用取模的方式轮询为每一个客户端分配线程,然后将客户端加入到该线程待处理的客户端列表中,此时客户端已分配到线程,在线程的运行函数IOThreadMain会处待写回数据的客户端

  3. 将io_threads_op线程操作状态置为写操作

  4. 遍历线程数,获取每一个线程要处理的客户端个数,将其设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程等待处理的客户端个数

  5. 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据,因为当前执行handleClientsWithPendingWritesUsingThreads函数的线程正是主线程,所以让主线程来处理io_threads_list[0]中存放的待处理客户端

  6. 主线程遍历io_threads_list[0]中每一个待处理的客户端,调用writeToClient往客户端写数据

  7. 主线程开启一个while(1)循环等待其他IO线程处理完毕

  8. 主线程开启while循环,循环条件是server.clients_pending_write列表的长度不为0,遍历clients_pending_write中待处理的写客户端:

    (1)调用listNodeValue获取待处理的客户端client

    (2)判断缓冲区数据是否全部写回到客户端,如果未全部写回调用connSetWriteHandler向内核注册写事件监听,回调函数为sendReplyToClient,待事件循环流程再次执行时,注册的可写事件会通过回调函数sendReplyToClient 处理,把缓冲区中的数据写回客户端。

  9. 调用listEmpty函数清空server.clients_pending_write列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;

if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
if (!server.io_threads_active) startThreadedIO();

listIter li;
listNode *ln;
// 获取待写回客户端列表clients_pending_write加入到迭代链表中
listRewind(server.clients_pending_write,&li);
int item_id = 0;
// 遍历待写的客户端
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
// 根据线程数取模,轮询分配线程
int target_id = item_id % server.io_threads_num;
// 分配线程,加入到对应线程的io_threads_list
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

/* 将io_threads_op线程操作状态置为写操作 */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
// 设置每个线程需要处理的客户端个数
setIOPendingCount(j, count);
}


/* 获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据*/
/* handleClientsWithPendingWritesUsingThreads函数的执行者刚好就是主线程,所以让主线程处理io_threads_list[0]中的数据*/
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 调用writeToClient往客户写数据
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);

/* 等待其他线程处理完毕 */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}

/* 再次获取server.clients_pending_read所有待写的客户端*/
listRewind(server.clients_pending_write,&li);
// 遍历
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

/* 如果缓冲区数据未全部写回调用connSetWriteHandler注册可写事件,回调函数为sendReplyToClient*/
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
// 清空clients_pending_write
listEmpty(server.clients_pending_write);

server.stat_io_writes_processed += processed;

return processed;
}

connSetWriteHandler

connSetWriteHandler函数在connection.c文件中,它通过set_write_handler注册了写handler,set_write_handler对应的是connSocketSetWriteHandler函数,所以connSetWriteHandler会被映射为connSocketSetWriteHandler,connSocketSetWriteHandler函数调用了aeCreateFileEvent向内核中注册可写事件监听,上面可知回调函数为sendReplyToClient ,等事件循环流程再次执行时,handleClientsWithPendingWritesUsingThreads 函数注册的可写事件会通过回调函数sendReplyToClient 处理,把缓冲区中的数据写回客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler, // set_write_handler对应connSocketSetWriteHandler函数
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
};

/*
* connSetWriteHandler
*/
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
// 注册写handler, set_write_handler对应的是connSocketSetWriteHandler函数
return conn->type->set_write_handler(conn, func, 0);
}

/*
* connSocketSetWriteHandler注册写事件
*/
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
if (func == conn->write_handler) return C_OK;

conn->write_handler = func;
if (barrier)
conn->flags |= CONN_FLAG_WRITE_BARRIER;
else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
if (!conn->write_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE, // 向内核注册写事件
conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}

总结

参考

极客时间 - Redis源码剖析与实战(蒋德钧)

Redis版本:redis-6.2.5