Linux 异步 IO 之 epoll

epoll 是 Linux 内核在 2.5 引入的一个 I/O 事件通知机制,用来取代旧的 select(2) 和 poll(2)。

select() 的工作模式

从通知机制上说,select() 的方式和 epoll 的方式类似,都是阻塞在某个函数上,如果有指定的 I/O 事件发生的话函数就会返回。这样的通知机制的好处在于,在没有事件发生的时候系统可以去干别的事情。不过 select() 只告诉你有事件发生了,但是没有说清楚是哪些 fd 触发了事件通知,你需要去遍历所有的 fd 来找出哪些真正需要处理。select() 的函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

fd_set 是一个 fd 的集合,参数列表中的 readfds,writefds,exceptfds 分别对应需要监控读事件,写事件和异常事件的 fd 集合,如果不需要监控某种类型的事件只需传入 NULL 即可;第一个参数 nfds 是这三个 fd 集合中的最大的 fd 加 1;最后一个参数是超时设置。

对于 fd_set 的操作由以下几个宏完成:

void FD_CLR(int fd, fd_set *set); // 将 fd 从 set 中清除
int  FD_ISSET(int fd, fd_set *set); // 判断 fd 是否在 set 中
void FD_SET(int fd, fd_set *set); // 将 fd 加入 set
void FD_ZERO(fd_set *set); // 清空 set

使用 select() 来监听网络事件的大概处理流程如下:

struct timeval ts;
fd_set readfds, writefds, exceptfds;
int server_fd, clientlist[];
int nfds = MAX_FD + 1;

while (1) {
    FD_ZERO(&readfds);
    FD_ZERO(&writefds);
    FD_ZERO(&exceptfds);

    FD_SET(server_fd, &readfds); // add server socket fd to readfds
    foreach (client in clientlist[]) {
        FD_SET(client, &readfds);
    }

    res = select(nfds, &readfds, &writefds, &exceptfds, &ts);
    if (res == -1)
        continue;

    if (FD_ISSET(server_fd, &readfds)) {
        client = accept(server_fd, NULL, NULL);
        clientlist[] = client; // add client to clientlist
    }

    foreach (client in clientlist[]) {
        if (FD_ISSET(client, &readfds))
            // do something
    }
}

从伪代码中可以看到,每次循环开始的时候都需要清空 fd_set,然后把需要监听的 fd 重新加入到对应的 fd_set 中;select() 返回后,需要遍历要监听的 client,判断每个 client 是否在对应事件的 fd_set 中。如果连接很多但是每次活动的 client 很少,这样其实大部分的比较都是不需要的。

epoll 模型

与 select() 不同的地方是,epoll 除了告诉用户有事件发生外,还告诉用户到底是哪些 fd 发生了哪些事件,这样就避免了对整个监控的 fd 集合进行遍历。在一些需要监控大量 fd,但是每次只有少量 fd 触发事件的情形下能大大提高效率。

epoll 相关的函数原型都在 <sys/epoll.h> 中定义。要使用 epoll,首先需要调用

int epoll_create(int size);

创建一个 epoll。参数 size 指定要监控事件的最大值,但是从内核 2.6.8 开始只要求 size 大于 0 就可以了。如果创建成功则会返回一个与 epoll 相关的 fd,后续对 epoll 的操作都通过这个 fd 完成。

要对某个 fd 的事件(例如读或写,或者网络连接是否被关闭)进行监听,使用函数

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

第一个参数就是 epoll_create() 的返回值;第三个参数是要进行操作的 fd;第二个参数是操作的类型,可选的参数有

  • EPOLL_CTL_ADD:将 fd 加入到 epfd 的监听列表中,要监听的事件类型由第四个参数指定;
  • EPOLL_CTL_MOD:将 fd 的相关事件修改为第四个参数指定的事件;
  • EPOLL_CTL_DEL:将 fd 从 epfd 中移除,这时第四个参数可以为 NULL (2.6.9 之前的内核要求不能为空)。

第四个参数是一个结构体 struct epoll_event:

typedef union epoll_data {
    void        *ptr;
    int         fd;
    uint32_t    u32;
    uint64_t    u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;    /* Epoll events */
    epoll_data_t data;      /* User data variable */
};

epoll_data_t 是一个 union,允许用户保存一些与该 fd 相关的内容,以便在事件被触发的时候能够获得相关的上下文信息,里面的字段取值没有要求;events 是一个 bitmap,用来保存要对 fd 进行监控的事件类型:

  • EPOLLIN:可以使用 read() 从 fd 读取数据;
  • EPOLLOUT:可以使用 write() 往 fd 写入数据;
  • EPOLLRDHUP(从 2.6.17 开始可用):对方(socket fd)关闭连接;
  • EPOLLPRI:fd 有紧急的数据需要读取(?不是很明白);
  • EPOLLERR:fd 出错(默认会带上这个事件,不需要显式指定);
  • EPOLLHUP:fd 连接中断(有可能是网络出错或其它异常事件,默认会带上这个事件,不需要显式指定);
  • EPOLLET:将 fd 的模式设为边缘触发(见后面的解释),如果不指定这个选项,默认的模式是水平触发;
  • EPOLLONESHOT(2.6.2 版本后可用):对指定的事件类型只触发一次,后面就不会再触发了,用户需要使用 epoll_ctl() 来修改对应 fd 的 event mask。

当向 epoll 中添加了需要监控的 fd 及对应的事件后,就可以使用

int epoll_wait(int epfd, struct epoll_event *events,
               int maxevents, int timeout);

来等待被监控的 fd 是否有指定的事件发生。如果有事件发生,和 fd 关联的 epoll_event 结构体会被放到第二个参数指向的数组中;第三个参数指定数组中最多可容纳多少个事件;第四个参数指定超时间隔,如果值为 -1 表示 epoll 使用阻塞模式(即一直等待,直到有事件发生才返回),值为 0 表示使用非阻塞模式,即无论有没有事件发生都立即返回。

从上面的描述,我们可以得到使用 epoll 的一般步骤为:

epfd = epoll_create(MAX_EVENTS);

event.data.fd = fd;
event.events = EPOLLIN; // and other flag(s)
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);

while (1) {
    nr_fds = epoll_wait(epfd, eventlist, MAX_EVENTS, -1);
    for (i = 0; i < nr_fds; ++i)
        // do something with eventlist[i]
}

当不需要 epoll 的时候使用 close() 来销毁,这个和一般的文件操作是一样的。

epoll 的工作模式

系统调用 read() 有两种模式,分别是阻塞模式和非阻塞模式。默认使用的是阻塞模式,即调用 read() 的时候会阻塞,直到读到数据或出错才会返回,如果没有数据可读函数就会一直等待;另一种模式是非阻塞模式,即读取数据的时候无论有没有数据都立即返回,调用者通过 errno 的值判断读取是否成功。epoll 也有两种工作模式:水平触发(Level-Triggered,LT)和边缘触发(Edge-Triggered,ET),前者可用于阻塞和非阻塞模式,而后者只能用于非阻塞模式。参考资料 [2] 中举了个例子说明这两种工作模式的区别。

假设有一个管道,在一端写入数据,另一端读取数据,下面的事情依次发生:

  1. 向 epoll 添加了一个新的 fd;
  2. 其中一个 fd 的一端写入了 2 kB 的数据;
  3. epoll_wait() 监控到了这个事件并返回对应的 epoll_event;
  4. 从 fd 的读取端读了 1 kB 数据;
  5. 再次调用 epoll_wait()。

如果使用了边缘触发,在第 5 步可能会被阻塞(状态并没有发生变化,仍然处于有数据的状态),这样尽管 fd 中还有数据没有读完,但是 epoll_wait() 并不会再次触发 fd;如果使用水平触发,则在第 5 步调用 epoll_wait() 还会继续返回 fd 对应的 epoll_event(因为满足“可读”这个条件,所以事件还会被触发)。或者可以这么理解:边缘触发是根据边界条件来决定是否触发事件(即只有从无数据变为有数据这样一个跃迁状态的变化才会触发事件),而水平触发则是根据数据量的变化(假设数据的水平线是 0,如果未读数据量大于 0 则超出水平线,这时就要触发事件)来决定。

由于边缘触发只需要发送一次通知,不需要多次触发 epoll_wait(),效率较高。要使用边缘触发,建议的做法是把要监控的 fd 设置为非阻塞模式,这样可以避免在读数据的时候因为无数据可读而阻塞,从而影响了 epoll_wait() 返回的其它事件的处理;另外在调用 read()/write() 时要判断 errno 为 EAGAIN(表示无数据可读或不能写入数据)时才停止读写。

其它

如果加入 epoll 的 fd 不是通过 dup(2), dup2(2), fcntl(2) F_DUPFD, or fork(2) 复制的,当这个 fd 被关闭后会从 epoll 中删除。epoll 不是根据 fd 而是根据底层的文件描述符来标识打开的文件(参考资料 [2] 中最后的 FAQ 有说明)。只有当这个文件描述符的引用计数为 0 后才可能从 epoll 中删除。

最后是一个使用 epoll 实现的服务端例子,使用了非阻塞模式,将收到的数据打印出来。

#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>

#include <fcntl.h>
#include <netdb.h>
#include <sys/epoll.h>

#define MAX_EVENTS 10

static int tcp_server_init(const char* addr, unsigned port)
{
    int fd, option;
    char buf[8];
    struct addrinfo hints, *res;

    if (!addr)
        addr = "0.0.0.0";

    memset(&hints, 0, sizeof(struct addrinfo));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;
    sprintf(buf, "%d", port);

    fd = getaddrinfo(addr, buf, &hints, &res);
    if (fd != 0) {
        fprintf(stderr, "%s: getaddrinfo() error: %s\n",
                __func__, gai_strerror(fd));
        return -1;
    }

    fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
    if (fd == -1) {
        fprintf(stderr, "%s: socket() error: %s\n",
                __func__, strerror(errno));
        goto err1;
    }

    option = 1;
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(int));

    if (bind(fd, res->ai_addr, res->ai_addrlen) != 0) {
        fprintf(stderr, "%s: bind() error: %s\n",
                __func__, strerror(errno));
        goto err2;
    }

    if (listen(fd, 0) == -1) {
        fprintf(stderr, "%s: listen() error: %s\n",
                __func__, strerror(errno));
        goto err2;
    }

    freeaddrinfo(res);
    return fd;

err2:
    close(fd);
err1:
    freeaddrinfo(res);
    return -1;
}

static inline void setnonblocking(int sockfd)
{
    int opt;

    opt = fcntl(sockfd, F_GETFL);
    if (opt < 0) {
        fprintf(stderr, "fcntl(F_GETFL) failed.\n");
        return;
    }

    opt |= O_NONBLOCK;
    if (fcntl(sockfd, F_SETFL, opt) < 0)
        fprintf(stderr, "fcntl(F_SETFL) failed.\n");
}

int main(void)
{
    struct epoll_event ev, events[MAX_EVENTS];
    int sockfd, efd;

    sockfd = tcp_server_init(NULL, 12345);
    if (sockfd == -1) {
        perror("tcp_server_init");
        return 0;
    }
    fprintf(stderr, "server fd = %d.\n", sockfd);


    efd = epoll_create(MAX_EVENTS);
    if (efd == -1) {
        perror("epoll_create");
        goto end;
    }
    fprintf(stderr, "epoll_create ok, epollfd = %d.\n", efd);

    ev.events = EPOLLIN;
    ev.data.fd = sockfd;
    if (epoll_ctl(efd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
        perror("epoll_ctl");
        goto err;
    }
    fprintf(stderr, "epoll ctl ok.\n");

    while (1) {
        int nfds, i;

        nfds = epoll_wait(efd, events, MAX_EVENTS, -1);
        fprintf(stderr, "\nepoll_wait returns %d event(s)\n", nfds);

        for (i = 0; i < nfds; ++i) {
            if (events[i].data.fd == sockfd) {
                int fd = accept(sockfd, NULL, NULL);
                if (fd == -1) {
                    perror("accept");
                    goto err;
                }
                setnonblocking(fd);

                ev.events = EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLET;
                ev.data.fd = fd;
                if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &ev) == -1) {
                    perror("epoll_ctl");
                    goto err;
                }

                fprintf(stderr, "get client fd = %d.\n", fd);
            } else if ((events[i].events & EPOLLHUP) || (events[i].events & EPOLLRDHUP)) {
                fprintf(stderr, "client disconnected. fd %d removed from epoll\n", events[i].data.fd);
                close(events[i].data.fd);
            } else {
                int nbytes;
                int wanted = 5;
                char buf[BUFSIZ];

                nbytes = read(events[i].data.fd, buf, wanted);
                while (nbytes > 0) {
                    fprintf(stderr, "get data -> ");
                    buf[nbytes] = '\n';
                    write(2, buf, nbytes + 1);
                    nbytes = read(events[i].data.fd, buf, wanted);
                }

                if (nbytes == -1) {
                    if (errno == EAGAIN)
                        fprintf(stderr, "no more data\n");
                    else
                        fprintf(stderr, "errno = %d.\n", errno);
                } else if (nbytes == 0) {
                    fprintf(stderr, "client disconnected. fd %d removed from epoll\n", events[i].data.fd);
                    close(events[i].data.fd);
                } else {
                    fprintf(stderr, "nbytes = %d, errno = %d.\n", nbytes, errno);
                }
            }
        }
    }

err:
    close(efd);
end:
    close(sockfd);

    return 0;
}

参考资料

[1] select(2)
[2] epoll(7)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注