epoll 是 Linux 内核在 2.5 引入的一个 I/O 事件通知机制,用来取代旧的 select(2) 和 poll(2)。
select() 的工作模式
从通知机制上说,select() 的方式和 epoll 的方式类似,都是阻塞在某个函数上,如果有指定的 I/O 事件发生的话函数就会返回。这样的通知机制的好处在于,在没有事件发生的时候系统可以去干别的事情。不过 select() 只告诉你有事件发生了,但是没有说清楚是哪些 fd 触发了事件通知,你需要去遍历所有的 fd 来找出哪些真正需要处理。select() 的函数原型如下:
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
fd_set 是一个 fd 的集合,参数列表中的 readfds,writefds,exceptfds 分别对应需要监控读事件,写事件和异常事件的 fd 集合,如果不需要监控某种类型的事件只需传入 NULL 即可;第一个参数 nfds 是这三个 fd 集合中的最大的 fd 加 1;最后一个参数是超时设置。
对于 fd_set 的操作由以下几个宏完成:
void FD_CLR(int fd, fd_set *set); // 将 fd 从 set 中清除
int FD_ISSET(int fd, fd_set *set); // 判断 fd 是否在 set 中
void FD_SET(int fd, fd_set *set); // 将 fd 加入 set
void FD_ZERO(fd_set *set); // 清空 set
使用 select() 来监听网络事件的大概处理流程如下:
struct timeval ts;
fd_set readfds, writefds, exceptfds;
int server_fd, clientlist[];
int nfds = MAX_FD + 1;
while (1) {
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
FD_SET(server_fd, &readfds); // add server socket fd to readfds
foreach (client in clientlist[]) {
FD_SET(client, &readfds);
}
res = select(nfds, &readfds, &writefds, &exceptfds, &ts);
if (res == -1)
continue;
if (FD_ISSET(server_fd, &readfds)) {
client = accept(server_fd, NULL, NULL);
clientlist[] = client; // add client to clientlist
}
foreach (client in clientlist[]) {
if (FD_ISSET(client, &readfds))
// do something
}
}
从伪代码中可以看到,每次循环开始的时候都需要清空 fd_set,然后把需要监听的 fd 重新加入到对应的 fd_set 中;select() 返回后,需要遍历要监听的 client,判断每个 client 是否在对应事件的 fd_set 中。如果连接很多但是每次活动的 client 很少,这样其实大部分的比较都是不需要的。
epoll 模型
与 select() 不同的地方是,epoll 除了告诉用户有事件发生外,还告诉用户到底是哪些 fd 发生了哪些事件,这样就避免了对整个监控的 fd 集合进行遍历。在一些需要监控大量 fd,但是每次只有少量 fd 触发事件的情形下能大大提高效率。
epoll 相关的函数原型都在 <sys/epoll.h> 中定义。要使用 epoll,首先需要调用
int epoll_create(int size);
创建一个 epoll。参数 size 指定要监控事件的最大值,但是从内核 2.6.8 开始只要求 size 大于 0 就可以了。如果创建成功则会返回一个与 epoll 相关的 fd,后续对 epoll 的操作都通过这个 fd 完成。
要对某个 fd 的事件(例如读或写,或者网络连接是否被关闭)进行监听,使用函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
第一个参数就是 epoll_create() 的返回值;第三个参数是要进行操作的 fd;第二个参数是操作的类型,可选的参数有
- EPOLL_CTL_ADD:将 fd 加入到 epfd 的监听列表中,要监听的事件类型由第四个参数指定;
- EPOLL_CTL_MOD:将 fd 的相关事件修改为第四个参数指定的事件;
- EPOLL_CTL_DEL:将 fd 从 epfd 中移除,这时第四个参数可以为 NULL (2.6.9 之前的内核要求不能为空)。
第四个参数是一个结构体 struct epoll_event:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
epoll_data_t 是一个 union,允许用户保存一些与该 fd 相关的内容,以便在事件被触发的时候能够获得相关的上下文信息,里面的字段取值没有要求;events 是一个 bitmap,用来保存要对 fd 进行监控的事件类型:
- EPOLLIN:可以使用 read() 从 fd 读取数据;
- EPOLLOUT:可以使用 write() 往 fd 写入数据;
- EPOLLRDHUP(从 2.6.17 开始可用):对方(socket fd)关闭连接;
- EPOLLPRI:fd 有紧急的数据需要读取(?不是很明白);
- EPOLLERR:fd 出错(默认会带上这个事件,不需要显式指定);
- EPOLLHUP:fd 连接中断(有可能是网络出错或其它异常事件,默认会带上这个事件,不需要显式指定);
- EPOLLET:将 fd 的模式设为边缘触发(见后面的解释),如果不指定这个选项,默认的模式是水平触发;
- EPOLLONESHOT(2.6.2 版本后可用):对指定的事件类型只触发一次,后面就不会再触发了,用户需要使用 epoll_ctl() 来修改对应 fd 的 event mask。
当向 epoll 中添加了需要监控的 fd 及对应的事件后,就可以使用
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
来等待被监控的 fd 是否有指定的事件发生。如果有事件发生,和 fd 关联的 epoll_event 结构体会被放到第二个参数指向的数组中;第三个参数指定数组中最多可容纳多少个事件;第四个参数指定超时间隔,如果值为 -1 表示 epoll 使用阻塞模式(即一直等待,直到有事件发生才返回),值为 0 表示使用非阻塞模式,即无论有没有事件发生都立即返回。
从上面的描述,我们可以得到使用 epoll 的一般步骤为:
epfd = epoll_create(MAX_EVENTS);
event.data.fd = fd;
event.events = EPOLLIN; // and other flag(s)
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
while (1) {
nr_fds = epoll_wait(epfd, eventlist, MAX_EVENTS, -1);
for (i = 0; i < nr_fds; ++i)
// do something with eventlist[i]
}
当不需要 epoll 的时候使用 close() 来销毁,这个和一般的文件操作是一样的。
epoll 的工作模式
系统调用 read() 有两种模式,分别是阻塞模式和非阻塞模式。默认使用的是阻塞模式,即调用 read() 的时候会阻塞,直到读到数据或出错才会返回,如果没有数据可读函数就会一直等待;另一种模式是非阻塞模式,即读取数据的时候无论有没有数据都立即返回,调用者通过 errno 的值判断读取是否成功。epoll 也有两种工作模式:水平触发(Level-Triggered,LT)和边缘触发(Edge-Triggered,ET),前者可用于阻塞和非阻塞模式,而后者只能用于非阻塞模式。参考资料 [2] 中举了个例子说明这两种工作模式的区别。
假设有一个管道,在一端写入数据,另一端读取数据,下面的事情依次发生:
- 向 epoll 添加了一个新的 fd;
- 其中一个 fd 的一端写入了 2 kB 的数据;
- epoll_wait() 监控到了这个事件并返回对应的 epoll_event;
- 从 fd 的读取端读了 1 kB 数据;
- 再次调用 epoll_wait()。
如果使用了边缘触发,在第 5 步可能会被阻塞(状态并没有发生变化,仍然处于有数据的状态),这样尽管 fd 中还有数据没有读完,但是 epoll_wait() 并不会再次触发 fd;如果使用水平触发,则在第 5 步调用 epoll_wait() 还会继续返回 fd 对应的 epoll_event(因为满足“可读”这个条件,所以事件还会被触发)。或者可以这么理解:边缘触发是根据边界条件来决定是否触发事件(即只有从无数据变为有数据这样一个跃迁状态的变化才会触发事件),而水平触发则是根据数据量的变化(假设数据的水平线是 0,如果未读数据量大于 0 则超出水平线,这时就要触发事件)来决定。
由于边缘触发只需要发送一次通知,不需要多次触发 epoll_wait(),效率较高。要使用边缘触发,建议的做法是把要监控的 fd 设置为非阻塞模式,这样可以避免在读数据的时候因为无数据可读而阻塞,从而影响了 epoll_wait() 返回的其它事件的处理;另外在调用 read()/write() 时要判断 errno 为 EAGAIN(表示无数据可读或不能写入数据)时才停止读写。
其它
如果加入 epoll 的 fd 不是通过 dup(2), dup2(2), fcntl(2) F_DUPFD, or fork(2) 复制的,当这个 fd 被关闭后会从 epoll 中删除。epoll 不是根据 fd 而是根据底层的文件描述符来标识打开的文件(参考资料 [2] 中最后的 FAQ 有说明)。只有当这个文件描述符的引用计数为 0 后才可能从 epoll 中删除。
最后是一个使用 epoll 实现的服务端例子,使用了非阻塞模式,将收到的数据打印出来。
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/epoll.h>
#define MAX_EVENTS 10
static int tcp_server_init(const char* addr, unsigned port)
{
int fd, option;
char buf[8];
struct addrinfo hints, *res;
if (!addr)
addr = "0.0.0.0";
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
sprintf(buf, "%d", port);
fd = getaddrinfo(addr, buf, &hints, &res);
if (fd != 0) {
fprintf(stderr, "%s: getaddrinfo() error: %s\n",
__func__, gai_strerror(fd));
return -1;
}
fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (fd == -1) {
fprintf(stderr, "%s: socket() error: %s\n",
__func__, strerror(errno));
goto err1;
}
option = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(int));
if (bind(fd, res->ai_addr, res->ai_addrlen) != 0) {
fprintf(stderr, "%s: bind() error: %s\n",
__func__, strerror(errno));
goto err2;
}
if (listen(fd, 0) == -1) {
fprintf(stderr, "%s: listen() error: %s\n",
__func__, strerror(errno));
goto err2;
}
freeaddrinfo(res);
return fd;
err2:
close(fd);
err1:
freeaddrinfo(res);
return -1;
}
static inline void setnonblocking(int sockfd)
{
int opt;
opt = fcntl(sockfd, F_GETFL);
if (opt < 0) {
fprintf(stderr, "fcntl(F_GETFL) failed.\n");
return;
}
opt |= O_NONBLOCK;
if (fcntl(sockfd, F_SETFL, opt) < 0)
fprintf(stderr, "fcntl(F_SETFL) failed.\n");
}
int main(void)
{
struct epoll_event ev, events[MAX_EVENTS];
int sockfd, efd;
sockfd = tcp_server_init(NULL, 12345);
if (sockfd == -1) {
perror("tcp_server_init");
return 0;
}
fprintf(stderr, "server fd = %d.\n", sockfd);
efd = epoll_create(MAX_EVENTS);
if (efd == -1) {
perror("epoll_create");
goto end;
}
fprintf(stderr, "epoll_create ok, epollfd = %d.\n", efd);
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(efd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
perror("epoll_ctl");
goto err;
}
fprintf(stderr, "epoll ctl ok.\n");
while (1) {
int nfds, i;
nfds = epoll_wait(efd, events, MAX_EVENTS, -1);
fprintf(stderr, "\nepoll_wait returns %d event(s)\n", nfds);
for (i = 0; i < nfds; ++i) {
if (events[i].data.fd == sockfd) {
int fd = accept(sockfd, NULL, NULL);
if (fd == -1) {
perror("accept");
goto err;
}
setnonblocking(fd);
ev.events = EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLET;
ev.data.fd = fd;
if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &ev) == -1) {
perror("epoll_ctl");
goto err;
}
fprintf(stderr, "get client fd = %d.\n", fd);
} else if ((events[i].events & EPOLLHUP) || (events[i].events & EPOLLRDHUP)) {
fprintf(stderr, "client disconnected. fd %d removed from epoll\n", events[i].data.fd);
close(events[i].data.fd);
} else {
int nbytes;
int wanted = 5;
char buf[BUFSIZ];
nbytes = read(events[i].data.fd, buf, wanted);
while (nbytes > 0) {
fprintf(stderr, "get data -> ");
buf[nbytes] = '\n';
write(2, buf, nbytes + 1);
nbytes = read(events[i].data.fd, buf, wanted);
}
if (nbytes == -1) {
if (errno == EAGAIN)
fprintf(stderr, "no more data\n");
else
fprintf(stderr, "errno = %d.\n", errno);
} else if (nbytes == 0) {
fprintf(stderr, "client disconnected. fd %d removed from epoll\n", events[i].data.fd);
close(events[i].data.fd);
} else {
fprintf(stderr, "nbytes = %d, errno = %d.\n", nbytes, errno);
}
}
}
}
err:
close(efd);
end:
close(sockfd);
return 0;
}
参考资料
[1] select(2)
[2] epoll(7)