aeEventLoop初始化
在server.c文件的initServer函数中,对aeEventLoop进行了初始化:
- 调用aeCreateEventLoop函数创建aeEventLoop结构体,对aeEventLoop结构体中的变量进行了初始化,之后调用了aeApiCreate函数创建epoll实例
- 调用aeCreateFileEvent函数向内核注册监听事件,由参数可知,注册的是对TCP文件描述符的可读事件监听,回调函数是acceptTcpHandler,当内核监听到TCP文件描述符有可读事件时,Redis将调用acceptTcpHandler函数对事件进行处理
1 | void initServer(void) { |
在aeCreateEventLoop函数调用时,传入的最大文件描述符个数为客户端最大连接数+宏定义CONFIG_FDSET_INCR的大小,CONFIG_FDSET_INCR的定义在server.h中:
1 |
aeEventLoop结构体创建
aeEventLoop结构体定义,在ae.h中:
1 | typedef struct aeEventLoop { |
aeCreateEventLoop
aeEventLoop结构体创建在aeCreateEventLoop函数中(ae.c文件):
- 分配aeEventLoop结构体所需内存
- 分配aeEventLoop结构体中其他变量所需内存
- 调用aeApiCreate函数创建epoll实例
- 对IO事件集合events的mask掩码初始化为AE_NONE,表示当前没有事件监听
1 | aeEventLoop *aeCreateEventLoop(int setsize) { |
创建epoll实例
aeApiState结构体定义,在ae_epoll.c中:
epfd:创建的epoll实例文件描述符
events:记录文件描述符产生的事件
1 | typedef struct aeApiState { |
aeApiCreate
epoll实例的的创建在aeApiCreate函数(ae_epoll.c文件)中,处理逻辑如下:
为aeApiState结构体分配内存空间
为aeApiState中的events分配内存空间,events数组个数为eventLoop中的最大文件描述个数
调用epoll_create函数创建epoll实例,将返回的epoll文件描述符保存在epfd中
- 将eventLoop的apidata指向创建的aeApiState,之后就可以通过eventLoop获取到epoll实例并且注册监听事件了
1 | static int aeApiCreate(aeEventLoop *eventLoop) { |
注册事件
IO 事件的数据结构是 aeFileEvent 结构体,在ae.c中定义:
mask:事件类型掩码,共有READABLE、WRITABLE、BARRIER三种事件,分别为可读事件、可写事件和屏障事件
rfileProc:写事件回调函数
wfileProc:读事件回调函数
1 | typedef struct aeFileEvent { |
aeCreateFileEvent
aeCreateFileEvent函数在ae.c文件中,主要处理逻辑如下:
- 根据传入的文件描述符,在eventLoop中获取对应的IO事件aeFileEvent fe
- 调用aeApiAddEvent方法注册要监听的事件
- 设置读写事件的回调函数
1 | int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, |
aeApiAddEvent
aeApiAddEvent用于注册事件(ae_epoll.c文件中):
- 从eventLoop获取aeApiState,因为aeApiState中的epfd记录了epoll实例
- 创建了epoll_event类型的变量ee,用于记录操作类型、要监听的文件描述符以及事件类型,在调用函数时使用
- 根据掩码mask判断操作类型,如果文件描述符还未设置监听事件mask掩码为AE_NONE, 类型设置为添加,否则设置为修改,操作类型有如下三种:
- EPOLL_CTL_ADD:用于向epoll添加监听事件
- EPOLL_CTL_MOD:用于修改已经注册过的监听事件
- EPOLL_CTL_ADD:用于删除监听事件
- 将redis的可读、可写事件类型转换为epoll的类型,读事件类型为EPOLLIN,写事件为EPOLLOUT,并设置到ee的events中
- 调用epoll_ctl函数添加文件描述符的监听事件,参数分别为epoll实例、操作类型、要监听的文件描述符、epoll_event类型变量ee
1 | static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { |
总结
Redis在启动时,调用aeCreateEventLoop创建aeEventLoop结构体和epoll实例,之后调用aeCreateFileEvent函数向内核注册TCP文件描述符的监听事件,当有客户端连接Redis服务时,TCP文件描述符产生可读事件,通过epoll可以获取产生事件的文件描述符,Redis就可以对连接请求进行处理。1
2
3
4
5// server.el是eventLoop
// server.ipfd[j]是监听端口的文件描述符
// AE_READABLE是读事件
// acceptTcpHandler是事件产生时的回调函数
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)
事件处理
aeMain函数在ae.c文件中,里面是一个while循环,它的处理逻辑如下:
- 通过eventLoop的stop判断是否处于停止状态,如果非停止状态进入第2步
- 判断eventLoop的beforesleep是否为空,如果不为空,调用beforesleep函数
- 调用了aeProcessEvents函数处理IO事件
1 | void aeMain(aeEventLoop *eventLoop) { |
aeProcessEvents
aeProcessEvents函数在ae.c文件中,处理逻辑如下:
- 调用aeApiPoll函数等待就绪的事件,如果有事件产生,返回就绪的文件描述符个数,aeApiPoll函数中对就绪文件描述符处理时将其放在了fired中
- for循环中处理就绪的事件,通过fired可以获取到每一个产生事件的文件描述符fd,根据文件描述符fd可以在eventLoop的events中获取对应的事件aeFileEvent,aeFileEvent中记录了事件的回调函数,之后根据事件类型,调用对应的回调函数,调用回调函数的入参分别为eventLoop、文件描述符、aeFileEvent的clientData、事件类型掩码
1 | int aeProcessEvents(aeEventLoop *eventLoop, int flags) |
aeApiPoll
aeApiPoll处理就绪的事件:
调用IO多路复用epoll_wait函数等待事件的产生,epoll_wait函数需要传入epoll实例、记录就绪事件集合的epoll_event,这两个参数分别在aeApiState的epfd和events中,当监听的文件描述符有事件产生时,epoll_wait返回就绪的文件描述符个数
对epoll_wait返回的就绪事件进行处理,事件记录在events变量中,遍历每一个就绪的事件,将事件对应的文件描述符设置在eventLoop的fire中,后续通过fire对事件进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
// 获取aeApiState,aeApiState记录了epoll实例,events记录了产生的事件
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 等待事件的产生,epoll_wait返回就绪的文件描述符个数,就绪的事件记录在state->events中
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
// 处理返回的就绪事件
for (j = 0; j < numevents; j++) {
int mask = 0;
// 获取每一个就绪的事件
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
// 将就绪事件的文件描述符设置到已触发的事件fired的fd中
eventLoop->fired[j].fd = e->data.fd;
// 设置事件类型掩码
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
处理客户端连接
acceptTcpHandler
由上面的调用可知,Redis在启动时,注册了AE_READABLE读事件,回调函数为acceptTcpHandler(network.c文件中)用于处理客户端连接,当有客户端与Redis连接时,epoll返回就绪的文件描述符,Redis在处理就绪的事件时调用acceptTcpHandler进行处理:
- 调用anetTcpAccept建立连接,并返回已连接的套接字文件描述符cfd
- 调用acceptCommonHandler(network.c文件中)函数,它又调用了createClient函数,在createClient函数中调用了aeCreateFileEvent,向内核注册已连接套接字的可读监听事件
1 | void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
createClient
createClient函数中调用了aeCreateFileEvent方法向内核中注册可读事件,上文可知传入的描述符是已连接套接字cfd,回调函数为readQueryFromClient,此时事件驱动框架增加了对客户端已连接套接字的监听,当客户端有数据发送到服务端时,Redis调用readQueryFromClient函数处理读事件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 注册已连接套接字的可读事件,回调函数为readQueryFromClient
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// ...
}
处理读事件
readQueryFromClient
readQueryFromClient函数在network.c文件中,是可读事件的回调函数,用于处理已连接套接字上的读事件,处理逻辑如下:
- 从已连接的套接字中读取客户端的请求数据到输入缓冲区
- 调用processInputBufferAndReplicate函数处理输入缓冲区的数据
1 | // aeProcessEvents中调用回调函数时,传入的参数分别为aeEventLoop、已连接套接字的文件描述符、aeFileEvent的clientData私有数据、事件类型掩码 |
处理写事件
在aeMain调用aeProcessEvents之前,先调用了beforeSleep方法,beforeSleep中又调用了handleClientsWithPendingWrites,它会将Redis Server缓冲区的数据写回到客户端:1
2
3
4
5
6
7
8
9void beforeSleep(struct aeEventLoop *eventLoop) {
// 省略...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
// 省略...
}.
handleClientsWithPendingWrites
Redis Server收到客户端的请求命令后,需要处理请求,然后将要返回的数据写回到客户端,写回到客户端的逻辑在handleClientsWithPendingWrites函数中,处理逻辑如下:
- 获取待写回数据的客户端列表
- 遍历每一个待写回数据的客户端,调用writeToClient方法将缓冲区的数据写到客户端socket中,然后调用clientHasPendingReplies方法判断数据是否全部写回,如果为否,则调用aeCreateFileEvent向内核注册客户端文件描述符的可写事件监听,交由回调函数sendReplyToClient处理
1 | int handleClientsWithPendingWrites(void) { |
clientHasPendingReplies
有时由于网络原因或者其他原因,可能只发出去了部分数据,客户端如果一直未从缓冲区读取数据,在缓冲区已满的情况,服务端将无法往客户端发送数据,所以调用clientHasPendingReplies函数判断数据是否写回完毕,如果未写回完毕交由事件循环驱动处理,提高处理效率。
整体流程图
总结
参考
【osc_avxkth26】Redis 网络通信模块源码分析(3)
Redis版本:redis-5.0.8