IO线程初始化
Redis在6.0版本中引入了多线程,提高IO请求处理效率。
在Redis Server启动函数main(server.c文件)中初始化服务之后,又调用了InitServerLast函数:1
2
3
4
5
6
7
8
9
10
11
12int main(int argc, char **argv) {
// ...
// 初始化服务
initServer();
// ...
// InitServerLast
InitServerLast();
// ...
// 事件循环
aeMain(server.el);
// ...
}
InitServerLast函数在server.c文件中,它调用了initThreadedIO函数对IO线程初始化:
1 | void InitServerLast() { |
initThreadedIO
initThreadedIO的实现在networking.c文件中:
- 初始化全局变量 server.io_threads_active线程活跃状态为0,表示未激活IO多线程
- 对server.io_threads_num的值进行判断,io_threads_num表示设置的IO线程数量
- 如果线程数设置为1,表示不开启多线程直接返回即可
- 如果线程数超过了IO_THREADS_MAX_NUM设置的最大值(128),则报错并停止redis服务
- 根据线程数的设置创建线程
- 初始化io_threads_list[i],io_threads_list是一个数组,数组中的每一个元素是一个list,里面存储每个线程要处理的客户端列表,下标为0的元素也就是io_threads_lis[0]存储的是主线程要处理的客户端列表,这里先调用listCreate创建列表,为io_threads_list[i]初始化
- 初始化io_threads_pending[i]为0,io_threads_pending数组存储每个线程等待处理的客户端个数
- 调用pthread_create创建线程,并传入了线程的运行函数IOThreadMain,之后将线程保存在io_threads中,io_threads数组存储了创建的线程描述符
1 |
|
io_threads_list
1 | /* io_threads_list存储每个线程要处理的客户端 */ |
io_threads
1 | /* 存储创建的线程*/ |
io_threads_pending
1 | /* 存储每个线程要等待处理的客户端个数 */ |
IO_THREADS_MAX_NUM定义
1 |
初始化流程图
IO线程运行函数
IO线程运行函数IOThreadMain在networking.c文件中,函数的入参传入的是线程id,它开启了一个while(1)循环,主要处理逻辑如下:
- 从io_threads_list数组中获取当前线程id要处理的客户端列表,放入到列表迭代器li中
- 遍历迭代器,获取每一个待处理的客户端client,根据io_threads_op线程的操作状态判断读写状态
- 如果是写状态,调用调用writeToClient处理
- 如果是读状态,调用readQueryFromClient处理
1 | void *IOThreadMain(void *myid) { |
io_threads_list数组中存储了每一个线程要处理的客户端列表,在线程运行函数IOThreadMain中,获取待处理的客户端列表,遍历每一个客户端,根据读写类型调用不同的方法进行处理,接下来就去看下Redis在何时将待处理的客户端加入到io_threads_list列表中的。
延迟读写操作
Redis在处理客户端读事件和写事件时会根据一定条件推迟客户端的读取操作或者往客户端写数据操作,将待处理的读客户端和待处理的写客户端分别加入到全局变量server的clients_pending_read和clients_pending_write列表中,全局变量server对应的结构体为redisServer:
全局变量server定义,在server.c文件:
1 | /* 全局变量server */ |
redisServer的结构体定义在server.h中:
1 | struct redisServer { |
推迟客户端读操作
readQueryFromClient
readQueryFromClient主要处理从客户端读取数据,在networking.c中实现,里面调用了postponeClientRead函数判断是否需要推迟客户端的读取操作 :
1 | void readQueryFromClient(connection *conn) { |
postponeClientRead
postponeClientRead函数用于判断是否延迟从客户端读取数据,包含四个条件:
- server.io_threads_active为1,表示激活了IO多线程
- server.io_threads_do_reads为1,表示IO多线程可以延迟执行客户端的读取操作,在配置文件中定义,可以通过修改配置文件来开启延迟读取客户端数据
- ProcessingEventsWhileBlocked值为0,processEventsWhileBlokced函数在执行时会将ProcessingEventsWhileBlocked的值置为1,执行完毕后置为0,Redis在读取RDB或者AOF文件时会调用processEventsWhileBlokced函数,为了避免读取RDB或AOF文件时阻塞无法及时处理请求,processEventsWhileBlokced函数在执行时不能推迟客户端数据读取。
- 客户端的现有标识不能有CLIENT_MASTER、CLIENT_SLAVE、CLIENT_PENDING_READ、CLIENT_BLOCKED等状态
- CLIENT_MASTER、CLIENT_SLAVE表示是用于主从复制的客户端
- CLIENT_PENDING_READ表示客户端本身已经是推迟读取状态
- CLIENT_BLOCKED表示客户端是阻塞状态
满足以上四个条件时将推迟从客户端读取数据,会将客户端标识置为CLIENT_PENDING_READ延迟读状态,并将待读取数据的客户端client加入到server.clients_pending_read中。
1 | int postponeClientRead(client *c) { |
推迟客户端写操作
在往客户端写数据的addReply(networking.c)函数中,调用了prepareClientToWrite判断是否准备往客户端写数据:
1 | void addReply(client *c, robj *obj) { |
prepareClientToWrite
prepareClientToWrite(networking.c)中,首先对客户端标识状态进行了一系列的判断,然后调用了clientHasPendingReplies函数判断输出缓冲区是否有还有数据等待写回到客户端,如果没有,判断客户端的标识是否是CLIENT_PENDING_READ已延迟读,如果不是CLIENT_PENDING_READ状态,调用clientInstallWriteHandler处理:
1 | int prepareClientToWrite(client *c) { |
clientInstallWriteHandler
clientInstallWriteHandler(networking.c)函数中对是否推迟客户端写操作进行了判断:
- 客户端标识不是CLIENT_PENDING_WRITE,对应条件为!(c->flags & CLIENT_PENDING_WRITE),表示客户端本身不是推迟写状态
- 客户端未在进行主从复制(对应条件为c->replstate == REPL_STATE_NONE) 或者 客户端是主从复制的从节点,但全量复制的 RDB 文件已经传输完成,客户端可以接收请求(对应条件 !c->repl_put_online_on_ack))
满足以上两个条件时将推迟客户端写操作,将客户端的标识置为延迟写CLIENT_PENDING_WRITE状态,并将客户端加入到待写回的列表server.clients_pending_write中。
1 |
|
IO线程的分配
上面我们已经知道了IO线程的初始化、IO线程的运行函数IOThreadMain主要处理逻辑,以及延迟读写的客户端是何时分别加入到server全局变量的clients_pending_read和clients_pending_write中的,接下来去看下时何时为客户端分配线程。
在aeProcessEvents处理事件的函数中,等待事件产生之前,调用了beforeSleep(networking.c)方法,beforeSleep中又调用了handleClientsWithPendingReadsUsingThreads为延迟读取操作的客户端分配线程:
1 | void beforeSleep(struct aeEventLoop *eventLoop) { |
延迟读操作的客户端分配线程
handleClientsWithPendingReadsUsingThreads
handleClientsWithPendingReadsUsingThreads(networking.c)主要逻辑如下:
从server.clients_pending_read获取延迟读取操作的客户端,将其加入到迭代列表
遍历延迟读操作的客户端列表,获取每一个待处理的客户端client,item_id表示每个客户端的序号,从0开始,每处理一个客户端就增1,用序号对线程数server.io_threads_num取模,得到一个target_id,客户端会被加入到io_threads_list[target_id]对应的列表中,也就是使用取模的方式轮询为每一个客户端分配对应线程,然后将客户端加入到该线程待处理的客户端列表中,此时客户端已分配到线程,在线程的运行函数IOThreadMain会调用readQueryFromClient处理客户端数据,需要注意多线程只是从客户端数据读取数据解析命令,并不会执行命令,在processInputBuffer中可以看到在IO多线程下只会将flags状态标记为CLIENT_PENDING_COMMAND,不会执行processCommandAndResetClient函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
// 省略...
if (c->argc == 0) {
resetClient(c);
} else {
/* 在IO多线程情况下不能在这里执行命令,所以在这里将client标记为CLIENT_PENDING_COMMAND然后返回,等待主线程同步执行命令 */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* 准备执行命令 */
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
// 省略...
}将io_threads_op线程操作状态置为读操作
遍历线程数,获取每一个线程要处理的客户端个数,将其设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程等待处理的客户端个数
获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据,因为当前执行handleClientsWithPendingReadsUsingThreads函数的线程正是主线程,所以让主线程来处理io_threads_list[0]中存放的待处理客户端
主线程遍历io_threads_list[0]中每一个待处理的客户端,调用readQueryFromClient处理,从客户端读取数据
主线程开启一个while(1)循环等待其他IO线程处理完毕,结束条件是pending为0,pending记录了所有线程要处理的客户端数量总和,在前面IOThreadMain函数中可以看到线程在处理完毕之后会将对应io_threads_pending数组中记录的个数置为0,当pending为0表示所有的线程都已将各自复制的客户端数据处理完毕
主线程开启while循环准备执行客户端命令(注意这里才开始执行命令,多线程只负责解析不负责执行),循环条件是server.clients_pending_read列表的长度不为0,主线程需要保证客户端的请求顺序,所从clients_pending_read列表中的第一个元素开始向后遍历:
(1)调用listNodeValue获取列表中的元素,也就是待处理的客户端client
(2)调用listDelNode将获取到的元素从列表删除,因为在第7步中,主线程已经等待其他所有的线程执行完毕,此时所有的线程已经将各自负责的客户端数据处理完成,所以可以将客户端从server.clients_pending_read中移除
(3)调用processPendingCommandsAndResetClient函数判断客户端标识是否是CLIENT_PENDING_COMMAND状态,CLIENT_PENDING_COMMAND状态表示客户端的请求命令已经被IO线程解析(processInputBuffer方法中可以看到状态被标记为CLIENT_PENDING_COMMAND),可以开始执行命令,接着调用processCommandAndResetClient函数执行客户端发送的请求命令
(4)由于客户端输入缓冲区可能有其他的命令未读取,这里调用processInputBuffer处理输入缓冲区数据继续解析命令并执行
1 | int handleClientsWithPendingReadsUsingThreads(void) { |
processPendingCommandsAndResetClient
processPendingCommandsAndResetClient函数在networking.c中,它先判断客户端标识是否是CLIENT_PENDING_COMMAND状态,CLIENT_PENDING_COMMAND状态表示客户端的请求命令已经被IO线程解析,可以被执行,所以如果处于CLIENT_PENDING_COMMAND状态,接下来会调用processCommandAndResetClient函数处理客户端命令,具体是调用processCommand函数执行命令的:
1 | /*processPendingCommandsAndResetClient函数(networking.c中) */ |
processCommand
processCommand函数在server.c文件中,它调用了addReply函数将需要返回给客户端的数据先写入缓冲区:
1 | int processCommand(client *c) { |
数据读取的整体过程如下,IO多线程只是负责从客户端读取数据解析命令,执行命令的过程仍然是单线程的:
延迟写操作的客户端分配线程
handleClientsWithPendingWritesUsingThreads
延迟写操作的客户端分配线程在handleClientsWithPendingWritesUsingThreads中实现(networking.c),处理逻辑与handleClientsWithPendingReadsUsingThreads类似:
从server.clients_pending_write获取延迟写操作的客户端,将其加入到迭代列表
遍历延迟写操作的客户端列表,获取每一个待处理的客户端client,使用取模的方式轮询为每一个客户端分配线程,然后将客户端加入到该线程待处理的客户端列表中,此时客户端已分配到线程,在线程的运行函数IOThreadMain会处待写回数据的客户端
将io_threads_op线程操作状态置为写操作
遍历线程数,获取每一个线程要处理的客户端个数,将其设置到线程对应的io_threads_pending[j]中,io_threads_pending数组中记录了每个线程等待处理的客户端个数
获取io_threads_list[0]中待处理的客户端列表,io_threads_list[0]存储的是主线程的数据,因为当前执行handleClientsWithPendingWritesUsingThreads函数的线程正是主线程,所以让主线程来处理io_threads_list[0]中存放的待处理客户端
主线程遍历io_threads_list[0]中每一个待处理的客户端,调用writeToClient往客户端写数据
主线程开启一个while(1)循环等待其他IO线程处理完毕
主线程开启while循环,循环条件是server.clients_pending_write列表的长度不为0,遍历clients_pending_write中待处理的写客户端:
(1)调用listNodeValue获取待处理的客户端client
(2)判断缓冲区数据是否全部写回到客户端,如果未全部写回调用connSetWriteHandler向内核注册写事件监听,回调函数为sendReplyToClient,待事件循环流程再次执行时,注册的可写事件会通过回调函数sendReplyToClient 处理,把缓冲区中的数据写回客户端。
调用listEmpty函数清空server.clients_pending_write列表
1 | int handleClientsWithPendingWritesUsingThreads(void) { |
connSetWriteHandler
connSetWriteHandler函数在connection.c文件中,它通过set_write_handler注册了写handler,set_write_handler对应的是connSocketSetWriteHandler函数,所以connSetWriteHandler会被映射为connSocketSetWriteHandler,connSocketSetWriteHandler函数调用了aeCreateFileEvent向内核中注册可写事件监听,上面可知回调函数为sendReplyToClient ,等事件循环流程再次执行时,handleClientsWithPendingWritesUsingThreads 函数注册的可写事件会通过回调函数sendReplyToClient 处理,把缓冲区中的数据写回客户端。
1 | ConnectionType CT_Socket = { |
总结
参考
Redis版本:redis-6.2.5