Redis程序的运行过程是一个处理事件的过程,也称Redis是一个事件驱动的服务。Redis中的事件分两类:文件事件(File Event)、时间事件(Time Event)。文件事件处理文件的读写操作,特别是与客户端通信的Socket文件描述符的读写操作;时间事件主要用于处理一些定时处理的任务。
本文首先介绍Redis的运行过程,阐明Redis程序是一个事件驱动的程序;接着介绍事件机制实现中涉及的数据结构以及事件的注册;最后介绍了处理客户端中涉及到的套接字文件读写事件。
Redis的运行过程是一个事件处理的过程,可以通过下图反映出来:
图1 Redis的事件处理过程
从上图可以看出:Redis服务器的运行过程就是循环等待并处理事件的过程。通过时间事件将运行事件分成一个个的时间分片,如图1的右半部分所示。如果在指定的时间分片中,有文件事件发生,如:读文件描述符可读、写文件描述符可写,则调用相应的处理函数进行文件的读写处理。文件事件处理完成之后,处理期望发生时间在当前时间之前或正好是当前时刻的时间事件。然后再进入下一次循环迭代处理。
如果在指定的事件间隔中,没有文件事件发生,则不需要处理,直接进行时间事件的处理,如下图所示。
图2 Redis的事件处理过程(无文件事件发生)
文件事件数据结构
Redis用如下结构体来记录一个文件事件:
/* File event structure */
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;
} aeFileEvent;
通过mask来描述发生了什么事件:
rfileProc和wfileProc分别为读事件和写事件发生时的回调函数,其函数签名如下:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
事件事件数据结构
Redis用如下结构体来记录一个时间事件:
/* Time event structure */
typedef struct aeTimeEvent {long long id; /* time event identifier. */long when_sec; /* seconds */long when_ms; /* milliseconds */aeTimeProc *timeProc;aeEventFinalizerProc *finalizerProc;void *clientData;struct aeTimeEvent *prev;struct aeTimeEvent *next;
} aeTimeEvent;
when_sec和when_ms指定时间事件发生的时间,timeProc为时间事件发生时的处理函数,签名如下:
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
prev和next表明时间事件构成了一个双向链表。
事件循环
Redis用如下结构体来记录系统中注册的事件及其状态:
/* State of an event based program */
typedef struct aeEventLoop {int maxfd; /* highest file descriptor currently registered */int setsize; /* max number of file descriptors tracked */long long timeEventNextId;time_t lastTime; /* Used to detect system clock skew */aeFileEvent *events; /* Registered events */aeFiredEvent *fired; /* Fired events */aeTimeEvent *timeEventHead;int stop;void *apidata; /* This is used for polling API specific data */aeBeforeSleepProc *beforesleep;aeBeforeSleepProc *aftersleep;
} aeEventLoop;
这一结构体中,最主要的就是文件事件指针events和时间事件头指针timeEventHead。文件事件指针event指向一个固定大小(可配置)数组,通过文件描述符作为下标,可以获取文件对应的事件对象。
事件驱动的程序实际上就是在事件发生时,调用相应的处理函数(即:回调函数)进行逻辑处理。因此关于事件,程序需要知道:①事件的发生;② 回调函数。事件的注册过程就是告诉程序这两。下面我们分别从文件事件、时间事件的注册过程进行阐述。
文件事件的注册过程
对于文件事件:
这就是文件事件的注册过程,函数的实现如下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}aeFileEvent *fe = &eventLoop->events[fd];if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}
这段代码逻辑非常清晰:首先根据文件描述符获得文件事件对象,接着在操作系统中添加自己关心的文件描述符(addApiAddEvent),最后将回调函数记录到文件事件对象中。因此,一个线程就可以同时监听多个文件事件,这就是IO多路复用。操作系统提供多种IO多路复用模型,如:Select模型、Poll模型、EPOLL模型等。Redis支持所有这些模型,用户可以根据需要进行选择。不同的模型,向操作系统添加文件描述符方式也不同,Redis将这部分逻辑封装在aeApiAddEvent中,下面代码是EPOLL模型的实现:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. */int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;mask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}
这段代码就是对操作系统调用epoll_ctl()的封装,EPOLLIN对应的是读(输入)事件,EPOLLOUT对应的是写(输出)事件。
时间事件的注册过程
对于时间事件:
对应的事件事件注册函数如下:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id = eventLoop->timeEventNextId++;aeTimeEvent *te;te = zmalloc(sizeof(*te));if (te == NULL) return AE_ERR;te->id = id;aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);te->timeProc = proc;te->finalizerProc = finalizerProc;te->clientData = clientData;te->prev = NULL;te->next = eventLoop->timeEventHead;if (te->next)te->next->prev = te;eventLoop->timeEventHead = te;return id;
}
这段代码逻辑也是非常简单:首先创建时间事件对象,接着设置事件,设置回调函数,最后将事件事件对象插入到时间事件链表中。设置时间事件期望发生的时间比较简单:
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {long cur_sec, cur_ms, when_sec, when_ms;aeGetTime(&cur_sec, &cur_ms);when_sec = cur_sec + milliseconds/1000;when_ms = cur_ms + milliseconds%1000;if (when_ms >= 1000) {when_sec ++;when_ms -= 1000;}*sec = when_sec;*ms = when_ms;
}static void aeGetTime(long *seconds, long *milliseconds)
{struct timeval tv;gettimeofday(&tv, NULL);*seconds = tv.tv_sec;*milliseconds = tv.tv_usec/1000;
}
当前时间加上期望的时间间隔,作为事件期望发生的时刻。
Redis为客户端提供存储数据和获取数据的缓存服务,监听并处理来自请求,将结果返回给客户端,这一过程将会发生以下文件事件:
与上图相对应,对于一个请求,Redis会注册三个文件事件:
TCP连接建立事件
服务器初始化时,在服务器套接字上注册TCP连接建立的事件。
void initServer(void) {/* Create an event handler for accepting new connections in TCP and Unix* domain sockets. */for (j &#61; 0; j < server.ipfd_count; j&#43;&#43;) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) &#61;&#61; AE_ERR){serverPanic("Unrecoverable error creating server.ipfd file event.");}}
}
回调函数为acceptTcpHandler&#xff0c;该函数最重要的职责是创建客户端结构。
客户端套接字读事件
创建客户端&#xff1a;在客户端套接字上注册客户端套接字可读事件。
if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) &#61;&#61; AE_ERR)
{close(fd);zfree(c);return NULL;
}
回调函数为readQueryFromClient&#xff0c;顾名思义&#xff0c;此函数将从客户端套接字中读取数据。
向客户端返回数据
Redis完成请求后&#xff0c;Redis并非处理完一个请求后就注册一个写文件事件&#xff0c;然后事件回调函数中往客户端写回结果。根据图1&#xff0c;检测到文件事件发生后&#xff0c;Redis对这些文件事件进行处理&#xff0c;即&#xff1a;调用rReadProc或writeProc回调函数。处理完成后&#xff0c;对于需要向客户端写回的数据&#xff0c;先缓存到内存中&#xff1a;
typedef struct client {// ...其他字段list *reply; /* List of reply objects to send to the client. *//* Response buffer */int bufpos;char buf[PROTO_REPLY_CHUNK_BYTES];
}&#xff1b;
发送给客户端的数据会存放到两个地方&#xff1a;
这里遵循一个原则&#xff1a;只要能存放在buf中&#xff0c;就尽量存入buf字节数组中&#xff0c;如果buf存不下了&#xff0c;才存放在reply对象数组中。
写回发生在进入下一次等待文件事件之前&#xff0c;见图1中【等待前处理】&#xff0c;会调用以下函数来处理客户端数据写回逻辑&#xff1a;
int writeToClient(int fd, client *c, int handler_installed) {while(clientHasPendingReplies(c)) {if (c->bufpos > 0) {nwritten &#61; write(fd,c->buf&#43;c->sentlen,c->bufpos-c->sentlen);if (nwritten <&#61; 0) break;c->sentlen &#43;&#61; nwritten;totwritten &#43;&#61; nwritten;if ((int)c->sentlen &#61;&#61; c->bufpos) {c->bufpos &#61; 0;c->sentlen &#61; 0;}} else {o &#61; listNodeValue(listFirst(c->reply));objlen &#61; o->used;if (objlen &#61;&#61; 0) {c->reply_bytes -&#61; o->size;listDelNode(c->reply,listFirst(c->reply));continue;}nwritten &#61; write(fd, o->buf &#43; c->sentlen, objlen - c->sentlen);if (nwritten <&#61; 0) break;c->sentlen &#43;&#61; nwritten;totwritten &#43;&#61; nwritten;}}
}
上述函数只截取了数据发送部分&#xff0c;首先发送buf中的数据&#xff0c;然后发送reply中的数据。
有读者可能会疑惑&#xff1a;write()系统调用是阻塞式的接口&#xff0c;上述做法会不会在write()调用的地方有等待&#xff0c;从而导致性能低下&#xff1f;这里就要介绍Redis是怎么处理这个问题的。
首先&#xff0c;我们发现创建客户端的代码&#xff1a;
client *createClient(int fd) {client *c &#61; zmalloc(sizeof(client));if (fd !&#61; -1) {anetNonBlock(NULL,fd);}
}
可以看到设置fd是非阻塞&#xff08;NonBlock&#xff09;&#xff0c;这就保证了在套接字fd上的read()和write()系统调用不是阻塞的。
其次&#xff0c;和文件事件的处理操作一样&#xff0c;往客户端写数据的操作也是批量的&#xff0c;函数如下&#xff1a;
int handleClientsWithPendingWrites(void) {listRewind(server.clients_pending_write,&li);while((ln &#61; listNext(&li))) {/* Try to write buffers to the client socket. */if (writeToClient(c->fd,c,0) &#61;&#61; C_ERR) continue;/* If after the synchronous writes above we still have data to* output to the client, we need to install the writable handler. */if (clientHasPendingReplies(c)) {int ae_flags &#61; AE_WRITABLE;if (aeCreateFileEvent(server.el, c->fd, ae_flags,sendReplyToClient, c) &#61;&#61; AE_ERR){freeClientAsync(c);}}}
}
可以看到&#xff0c;首先对每个客户端调用刚才介绍的writeToClient()函数进行写数据&#xff0c;如果还有数据没写完&#xff0c;那么注册写事件&#xff0c;当套接字文件描述符写就绪时&#xff0c;调用sendReplyToClient()进行剩余数据的写操作&#xff1a;
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {writeToClient(fd,privdata,1);
}
仔细想一下就明白了&#xff1a;处理完得到结果后&#xff0c;这时套接字的写缓冲区一般是空的&#xff0c;因此write()函数调用成功&#xff0c;所以就不需要注册写文件事件了。如果写缓冲区满了&#xff0c;还有数据没写完&#xff0c;此时再注册写文件事件。并且在数据写完后&#xff0c;将写事件删除&#xff1a;
int writeToClient(int fd, client *c, int handler_installed) {if (!clientHasPendingReplies(c)) {if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);}
}
注意到&#xff0c;在sendReplyToClient()函数实现中&#xff0c;第三个参数正好是1。
希望本文对你有所帮助~~如果对软件测试、接口测试、自动化测试、面试经验交流感兴趣可以加入我们。642830685&#xff0c;免费领取最新软件测试大厂面试资料和Python自动化、接口、框架搭建学习资料&#xff01;技术大牛解惑答疑&#xff0c;同行一起交流。