• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Redis 服务端事件处理流程

武飞扬头像
juejin
帮助64

一、事件处理

1.1 什么是事件

Redis 为什么运行得比较快?

原因之一就是它的服务端处理程序用了事件驱动的处理方式。

服务端网络开发中,IO 的处理过程,可以把一个完整 IO 处理过程分解为一个一个小的任务,可以把这个小的任务叫做事件,处理每个小任务也叫作事件处理。比如把 IO 处理过程分为 读事件、计算事件、写事件等各种小的任务进行处理。就是分而治之的思想。

而在 Redis 服务端处理程序中,它把处理程序抽象为了 2 大类的事件进行处理:文件事件 file event时间事件 time event

  • 文件事件:

Redis 把对网络套接字操作的过程抽象为了各种文件事件。客户端与服务端通信产生的处理程序抽象为相应的文件事件,Redis 服务端通过监听并处理这些文件事件来完成各种网络操作。

  • 时间事件

Redis 服务器中的一些操作需要在给定的时间点执行,而时间事件就是 Redis 服务器对这类定时操作的抽象。

  • 网络处理模型

Redis 中的网络模型处理模式-Reactor 单线程分发模式,可以参考 服务端高性能网络IO编程模型简析www.cnblogs.com/jiujuan/p/1… 小结的单线程模式。

(单 Reactor 单线程模式 (来自《Scalable IO in Java》作者:Doug Lea))

基于 redis 3.0 分析

1.2 文件事件

1.2.1 套接字处理介绍

在上一小节,我们了解 redis 是单线的分发模式,及是 dispatch 分发模式。那 Redis 是怎么实现的呢?它用到了 epoll,IO 多路复用程序,epoll 能同时监听多个套接字变化,并根据套接字不同的动作变化来执行注册的事件处理器。

一个套接字整个过程可以分为应答-accept、读取-read、写入-write、关闭-close 等操作,而这些操作可以看作是一个一个事件,在 redis 中,可以当成文件事件,发生相应的事件时,文件处理器就会调用前面已经注册好的相应的文件事件处理器(实际就是函数)来处理。

image-20220922214825365

epoll 怎么监听多个套接字事件的变化?

看下面的这些文章,就可以了解其中的详情:

在 redis 中,对 epoll 进行了封装叫 ae_epoll.c

而事件的处理类型、处理函数接口可以在 ae.h 文件中找到。

1.2.2 文件事件处理器

Redis 为文件事件的处理编写了多个事件处理器,用于不同的套接字变化的事件处理。

  1. 连接应答处理器

networking.c/acceptTcpHandler 函数,这个处理器用于对连接服务器监听套接字的客户端进行应答,是对系统的 accept 函数的封装。

  1. 命令请求处理器-从客户端读取数据

networking.c/readQueryFromClient 函数,这个处理器负责从套接字中读入客户端发送的命令请求内容,是对系统 read 函数的封装。

  1. 命令回复处理器

networking.c/sendReplyToClient 函数,这个处理器将服务器执行后的命令回复通过套接字返回给客户端,是对系统 write 函数的封装。

1.3 时间事件

时间事件分为2类:

  • 定时事件:让一段程序在指定的时间之后执行一次。
  • 周期性事件:让一段程序每隔一段时间执行一次。

时间事件的结构,ae.h/aeTimeEvent

/* Time event structure */typedef struct aeTimeEvent {    long long id; /* time event identifier. */    long when_sec; /* seconds */    long when_ms; /* milliseconds */    aeTimeProc *timeProc;    aeEventFinalizerProc *finalizerProc;    void *clientData;    struct aeTimeEvent *next;} aeTimeEvent;

在 Redis 正常模式下,只有一个 serverCron 时间事件,函数在 redis.c/serverCron,这个时间事件函数干了很多事情。

  • 更新服务器各类统计信息,比如时间、内存占用、数据占用情况等
  • 清理数据库中的过期键值
  • 关闭和清理连接失败的客户端
  • 尝试进行 AOF 和 RDB 的持久化操作
  • 。。。。。。

Redis 以周期性的事件方式来运行 ServerCron 函数。默认平均每隔 100 毫秒运行一次,从 Redis2.8 开始,可以在 redis.conf 中修改 hz 选项来调整 serverCron 每秒执行的次数。

1.4 事件处理函数

封装事件结构体aeEventLoop

不管是文件事件还是时间事件都封装在结构体 ae.h/aeEventLoop 中:

// https://github.com/redis/redis/blob/3.0/src/ae.h#L87typedef struct aeEventLoop {    int maxfd;   /* highest file descriptor currently registered 当前注册的最大文件描述符*/    int setsize; /* max number of file descriptors tracked 当前已经追踪的最大文件描述符数*/    long long timeEventNextId; // 生成时间事件的唯一标识 ID    time_t lastTime;     /* Used to detect system clock skew 记录最后一次执行时间事件的时间*/    aeFileEvent *events; /* Registered events 存储已经注册的文件事件,它是一个数组,用 fd 做索引来访问相应事件(对应处理函数)*/    aeFiredEvent *fired; /* Fired events 已经就绪的文件事件*/    aeTimeEvent *timeEventHead; // 时间事件链表头节点,因为可能有多个时间事件,组成一个链表    int stop;  // 标识事件循环是否结束    void *apidata; /* This is used for polling API specific data 多路复用的私有数据*/    aeBeforeSleepProc *beforesleep; // 进程阻塞之前会调用的函数,事件函数运行前调用的函数,相当于事件函数运行前的钩子函数} aeEventLoop;/* File event structure 文件事件结构体*/typedef struct aeFileEvent {    int mask; /* one of AE_(READABLE|WRITABLE) */    aeFileProc *rfileProc;    aeFileProc *wfileProc;    void *clientData;} aeFileEvent;// 定义 epoll 中所发生的事件对应的处理函数(函数指针)typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);/* A fired event 已经就绪的文件事件结构体*/typedef struct aeFiredEvent {    int fd;    int mask;} aeFiredEvent;

初始化 aeEventLoop 结构体的函数 ae.c/aeCreateEventLoop(),而这个函数里有一个很重要函数 ae_epoll.c/aeApiCreate() 函数,创建 epoll 对象,是对 epoll_create 函数的封装。

其实 redis 还对 epoll 的操作做了其它封装,都在 ae_epoll.c 文件里:

  • aeApiCreate 函数创建 epoll 对象,是对 epoll_create 的封装

  • aeApiAddEvent 函数用于添加事件,是对 epoll_ctl 的封装

  • aeApiDelEvent 函数用于删除事件,是对 epoll_ctl 的封装

  • aeApiPoll 是对 epoll_wait 的封装

aeCreateEventLoop 函数:

// https://github.com/redis/redis/blob/3.0/src/ae.c#L63// 初始化事件处理器aeEventLoop *aeCreateEventLoop(int setsize) {    aeEventLoop *eventLoop;    int i;    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;        // 初始化结构体 aeEventLoop    eventLoop->setsize = setsize;    eventLoop->lastTime = time(NULL);    eventLoop->timeEventHead = NULL;    eventLoop->timeEventNextId = 0;    eventLoop->stop = 0;    eventLoop->maxfd = -1;    eventLoop->beforesleep = NULL;        // 调用 aeApiCreate 创建 epoll 对象    if (aeApiCreate(eventLoop) == -1) goto err;    /* Events with mask == AE_NONE are not set. So let's initialize the     * vector with it. */    // 初始化监听事件    for (i = 0; i < setsize; i  )        eventLoop->events[i].mask = AE_NONE;        return eventLoop;err:    if (eventLoop) {        // 释放内存        zfree(eventLoop->events);        zfree(eventLoop->fired);        zfree(eventLoop);    }    return NULL;}

事件添加函数aeApiAddEvent

ae_epoll.c/aeApiAddEvent 函数作用上面介绍了,是对 epoll_ctl 函数的封装,添加一个 epoll 事件。它是在哪里被调用的呢?在 ae.c/aeCreateFileEvent 函数中调用的。

执行 ae.c/aeCreateFileEvent 函数创建对应的文件事件,然后把这个事件存储在 aeEventLoop 结构体的 events 字段中。

  • aeCreateFileEvent 函数代码
// https://github.com/redis/redis/blob/3.0/src/ae.c#L135// 根据 mask 的值监听 fd 文件的状态变化,当 fd 可用时,执行 proc 函数int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,        aeFileProc *proc, void *clientData){    if (fd >= eventLoop->setsize) {        errno = ERANGE;        return AE_ERR;    }    aeFileEvent *fe = &eventLoop->events[fd]; // 取出注册的文件事件,fd 作为索引    if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 这里调用 aeApiAddEvent 函数,把事件添加到eventLoop中        return AE_ERR;        // 根据文件处理类型(可读 或 可写),设置对应事件处理器 proc    fe->mask |= mask;    if (mask & AE_READABLE) fe->rfileProc = proc;    if (mask & AE_WRITABLE) fe->wfileProc = proc;        // 私有数据    fe->clientData = clientData;        // 如果fd大于最大的maxfd,那么就把当前fd更新最大fd数据maxfd    if (fd > eventLoop->maxfd)        eventLoop->maxfd = fd;        return AE_OK;}
// https://github.com/redis/redis/blob/3.0/src/ae_epoll.c#L73static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {    aeApiState *state = eventLoop->apidata;    struct epoll_event ee;    /* If the fd was already monitored for some event, we need a MOD     * operation. Otherwise we need an ADD operation. */    int op = eventLoop->events[fd].mask == AE_NONE ?            EPOLL_CTL_ADD : EPOLL_CTL_MOD;    ee.events = 0;    mask |= eventLoop->events[fd].mask; /* Merge old events */    if (mask & AE_READABLE) ee.events |= EPOLLIN; // 可读事件,EPOLLIN -> AE_READABLE    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 可写事件,EPOLLOUT -> AE_WRITABLE    ee.data.u64 = 0; /* avoid valgrind warning */    ee.data.fd = fd;    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;    return 0;}// https://github.com/redis/redis/blob/3.0/src/ae_epoll.c// 作为 eventLoop 结构的apidata存储typedef struct aeApiState {    int epfd; // epoll事件的文件描述符fd    struct epoll_event *events; // 事件列表,epoll_wait 返回的事件列表} aeApiState;
  • epoll_ctl 函数说明:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

  • epfd:函数 epoll_create 返回的 epoll 文件描述符fd。
  • op:要执行的操作。有几个操作的标识,EPOLL_CTL_ADD 表示添加事件;EPOLL_CTL_MOD 表示修改网络连接事件;EPOLL_CTL_DEL 表示删除事件。
  • fd:网络连接的 socket 文件描述符。
  • event:事件列表,用结构体 epoll_event 表示。

epoll_event 结构体代码:

struct epoll_event {          __uint32_t events;  epoll_data_t data;};typedef union epoll_data {  void *ptr;         int fd;         __uint32_t u32;         __uint64_t u64;   } epoll_data_t;

evnets 表示要监听的事件类型,常用的 EPOLLIN 表示文件描述符可读事件,EPOLLOUT 表示文件描述符可写事件。

data 表示与文件描述符相关联的数据。

其他 epoll 函数详细解释请参考这里:www.cnblogs.com/Anker/archi…

aeMain事件循环函数

在事件驱动程序中,一般都有事件循环(while),循环等待事件发生并处理,这个循环处理函数是 ae.c/aeMain():

// https://github.com/redis/redis/blob/3.0/src/ae.c#L450void aeMain(aeEventLoop *eventLoop) {    eventLoop->stop = 0;    while (!eventLoop->stop) {        if (eventLoop->beforesleep != NULL)            eventLoop->beforesleep(eventLoop); // 事件处理前要做的事情        aeProcessEvents(eventLoop, AE_ALL_EVENTS); // 开始处理事件    }}// https://github.com/redis/redis/blob/3.0/src/ae.c#L352// ae.c/aeProcessEvents() 函数,所有事件处理都封装在这个函数里,又进行了一个封装#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)

二、redis 启动初始化

2.1 main()入口函数

redis.c 文件中的 redis.c/main() 函数,这个函数里代码虽然很长,但是最重要的就两个函数-redis 服务器启动初始化(initServer)和事件处理循环函数(aeMain),

2.2 initServer()初始化

initServer() 函数:redis 服务器启动初始化。

initServer() 函数代码很长,截取几个重要的地方来分析下:

// https://github.com/redis/redis/blob/3.0/src/redis.c#L1766void initServer(void) {    int j;    ... ...            // 创建事件循环结构,上面有讲到过这个函数       server.el = aeCreateEventLoop(server.maxclients REDIS_EVENTLOOP_FDSET_INCR);    server.db = zmalloc(sizeof(redisDb)*server.dbnum);    /* Open the TCP listening socket for the user commands. */    // 监听端口    if (server.port != 0 &&        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)        exit(1);    ... ...                    /* Create the serverCron() time event, that's our main way to process     * background operations. */    // 创建时间事件,运行 serverCron 函数    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {        redisPanic("Can't create the serverCron time event.");        exit(1);    }    /* Create an event handler for accepting new connections in TCP and Unix     * domain sockets. */    // 创建文件事件,并注册相应的事件处理函数    for (j = 0; j < server.ipfd_count; j  ) {        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,            acceptTcpHandler,NULL) == AE_ERR) // acceptTcpHandler 接收连接时间处理函数            {                redisPanic(                    "Unrecoverable error creating server.ipfd file event.");            }    }    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); // acceptUnixHandler 本地unix处理器    ... ...}

结合上一节,Redis 服务端网络初始化过程,很重要的其实就是对 epoll IO多路复用的一个封装处理,封装抽象成了一个一个的事件处理。

等待客户端连接,创建 epoll 对象,注册相应事件,等待事件发生。

刚好对应于 epoll 的 3 个函数:

  • epoll_create:创建 epoll 对象。
  • epoll_ctl:事件注册。EPOLL_CTL_ADD,注册新的fd到epfd中;EPOLL_CTL_MOD,修改已经注册的fd的监听事件;EPOLL_CTL_DEL,从epfd中删除一个fd。【int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)】。
  • epoll_wait:等待事件发生,进行处理。

initServer小结

initServer()初始化服务器

->创建epoll(aeCreateEventLoop) -> 监听端口(listenToPort) -> 用aeCreateFileEvent()注册事件处理器acceptTcpHandler

这一切准备完毕后,等待正在的事件处理,也就是用户请求处理。

与 tcp/ip 网络编程的 socket 套接字编程处理过程差不多,见前面的文章:www.cnblogs.com/jiujuan/p/1… 第二小节。

三、事件循环处理数据aeMain函数中

就是前面讲到的 ae.c/aeMain(server.el) 函数,里面有个重要的函数 ae.c/aeProcessEvents(),处理所有已经到达的时间事件和所有已经就绪的文件事件。

aeProcessEvents 函数:

/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * * The function returns the number of events processed. */// https://github.com/redis/redis/blob/3.0/src/ae.c#L352int aeProcessEvents(aeEventLoop *eventLoop, int flags){    int processed = 0, numevents;    /* Nothing to do? return ASAP */    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;    /* Note that we want call select() even if there are no     * file events to process as long as we want to process time     * events, in order to sleep until the next time event is ready     * to fire. */    if (eventLoop->maxfd != -1 ||        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {        int j;        aeTimeEvent *shortest = NULL;        struct timeval tv, *tvp;        // 查询最近时间事件        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))            shortest = aeSearchNearestTimer(eventLoop);        if (shortest) {// 如果找到了,就与当前时间计算差值            long now_sec, now_ms;            /* Calculate the time missing for the nearest             * timer to fire. */            // 计算距离当前时间最近的时间事件还要多久到达            // 将计算的时间距离保存在 tv 结构中            aeGetTime(&now_sec, &now_ms);            tvp = &tv;            tvp->tv_sec = shortest->when_sec - now_sec;            if (shortest->when_ms < now_ms) {                tvp->tv_usec = ((shortest->when_ms 1000) - now_ms)*1000;                tvp->tv_sec --;            } else {                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;            }            if (tvp->tv_sec < 0) tvp->tv_sec = 0;            if (tvp->tv_usec < 0) tvp->tv_usec = 0;        } else {            /* If we have to check for events but need to return             * ASAP because of AE_DONT_WAIT we need to set the timeout             * to zero */            if (flags & AE_DONT_WAIT) { // 设置文件事件不阻塞                tv.tv_sec = tv.tv_usec = 0;                tvp = &tv;            } else {                /* Otherwise we can block */                tvp = NULL; /* wait forever */ // 一直等待,文件事件阻塞直到有文件事件到达            }        }        // 处理文件事件,阻塞时间由第二个参数决定        numevents = aeApiPoll(eventLoop, tvp);        for (j = 0; j < numevents; j  ) {            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; // 从已经就绪的文件事件数组中取出事件            int mask = eventLoop->fired[j].mask;            int fd = eventLoop->fired[j].fd;            int rfired = 0;    /* note the fe->mask & mask & ... code: maybe an already processed             * event removed an element that fired and we still didn't             * processed, so we check if the event is still valid. */            if (fe->mask & mask & AE_READABLE) {// 可读事件                rfired = 1;                fe->rfileProc(eventLoop,fd,fe->clientData,mask);            }            if (fe->mask & mask & AE_WRITABLE) {// 可写事件                if (!rfired || fe->wfileProc != fe->rfileProc)                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);            }            processed  ;        }    }    /* Check time events */    if (flags & AE_TIME_EVENTS) // 执行时间事件        processed  = processTimeEvents(eventLoop);    return processed; /* return the number of processed file/time events */}

四、参考:

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/taneiha
系列文章
更多 icon
同类精品
更多 icon
继续加载