Linux 异步 IO 之 POSIX AIO

先来看一个简单的读写文件程序,程序读取源文件,然后把内容打印到屏幕上:

#include <unistd.h>
#include <fcntl.h>

#define BUFSIZE 4096

int main(void)
{
    int fd, nbytes;
    char buf[BUFSIZE];

    fd = open(__FILE__, O_RDONLY);
    while ((nbytes = read(fd, buf, BUFSIZE)) > 0)
        write(1, buf, nbytes);

    close(fd);
    return 0;
}

在这个例子中的 read()/write() 都是同步阻塞 IO,即如果没有数据可读或写入未完成时当前线程会一直等待,直到读写操作完成或出错才会返回。

同步 IO 是指,调用者发起读操作后需要等待数据拷到指定的缓冲区后才能继续往下执行。与同步 IO 相对的是异步 IO,即读操作发起后并不是立即被执行,而是会被放到任务队列中等待调度器调度,任务完成后再以某种方式(一般是回调函数,或者信号)通知调用者。发起读操作后,调用者可以继续往下执行其它操作。同步与异步的一个区别是,前者在读写数据的过程中调用者不能做其它事情,而后者的读写操作则是由操作系统来完成的,在读数据(把数据从其它地方拷到指定的缓冲区中)期间调用者可以做其它事情。

阻塞 IO 是指如果没有数据可读或者写入未完成是,调用者会一直等待,直到有数据可读或者写操作完成。类似的,与阻塞 IO 对应的是非阻塞 IO,即发起读操作后函数不管是否读到数据都会立即返回,调用者通过函数返回值判断调用是否成功。

POSIX AIO

先来看看 POSIX AIO(参考资料 [1])。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <aio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>

#define BUFSIZE 8192

int main(void)
{
    int fd, ret;
    struct aiocb my_aiocb;

    fd = open(__FILE__, O_RDONLY);
    if (fd < 0) {
        perror("open");
        return -1;
    }

    memset(&my_aiocb, 0, sizeof(struct aiocb));

    my_aiocb.aio_buf = malloc(BUFSIZE);
    if (!my_aiocb.aio_buf) {
        perror("malloc");
        goto end;
    }

    my_aiocb.aio_fildes = fd;
    my_aiocb.aio_nbytes = BUFSIZE;
    my_aiocb.aio_offset = 0;

    ret = aio_read(&my_aiocb);
    if (ret < 0) {
        perror("aio_read");
        goto end;
    }

    while (aio_error(&my_aiocb) == EINPROGRESS);

    if ((ret = aio_return(&my_aiocb)) > 0)
        printf("read %d bytes.\n", ret);
    else
        perror("aio_read");

    free((void*)(my_aiocb.aio_buf));
end:
    close(fd);
    return ret;
}

其中主要的结构体 struct aiocb(参考资料 [2]):

struct aiocb {
    /* The order of these fields is implementation-dependent */

    int             aio_fildes;     /* File descriptor */
    off_t           aio_offset;     /* File offset */
    volatile void*  aio_buf;        /* Location of buffer */
    size_t          aio_nbytes;     /* Length of transfer */
    int             aio_reqprio;    /* Request priority */
    struct sigevent aio_sigevent;   /* Notification method */
    int             aio_lio_opcode; /* Operation to be performed; lio_listio() only */

    /* Various implementation-internal fields not shown */
};

其中 aio_fildes 是对应的文件 fd;aio_offset 是读取文件的起始位置;aio_buf 是存放文件内容的 buffer; aio_nbytes 指定读取的字节数;aio_reqprio 指定读操作的优先级(因为 POSIX AIO 是通过线程模拟的,其实就是设置线程的优先级);aio_sigevent 说明读写操作完成后的通知方式,可能的取值为 SIGEV_NONE,SIGEV_SIGNAL 和 SIGEV_THREAD;aio_lio_opcode 说明 listio 的操作类型。这些选项在后面的例子中详细介绍。

设置好相应的值后就可以调用

int aio_read(struct aiocb *aiocbp);

发起异步读操作。发起异步读操作后 aio_read() 会立即返回,如果返回值小于 0 表示失败,可以通过 errno 获取失败原因。类似地,要发起异步写操作可以调用函数

int aio_write(struct aiocb *aiocbp);

发起操作后可以调用

int aio_error(const struct aiocb *aiocbp);

获取请求的状态,返回值为 0 表示操作成功,ECALNCELED 表示操作已被取消,EINPROGRESS 表示操作正在进行中。在上面的程序中通过一个循环判断是否完成,做法有点粗暴,还可以使用

int aio_suspend(const struct aiocb * const aiocb_list[],
                int nitems, const struct timespec *timeout);

来阻塞当前线程。其中第一个参数是要等待完成的请求,第二个参数是要等待的请求个数,第三个参数指定超时间隔(如果为 NULL 表示没有时间限制)。只要 aiocb_list 中有一个操作完成了(不管成功还是失败)函数都会返回,或者收到请求完成的信号,或者是在指定的时间内没有请求完成函数也会返回。

操作完成后调用

ssize_t aio_return(struct aiocb *aiocbp);

获取实际读取的字节数。

如果需要取消某次操作,可以调用函数

int aio_cancel(int fd, struct aiocb *aiocbp);

如果有多次读写请求,可以使用

int lio_listio(int mode, struct aiocb *const aiocb_list[],
               int nitems, struct sigevent *sevp);

来一次发起多个请求提升性能:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <aio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>

#define BUFSIZE 8192
#define NR_ITEM 2

static inline int init_aiocb(struct aiocb** cb, int fd)
{
    struct aiocb* res;

    res = malloc(sizeof(struct aiocb));
    if (!res) {
        perror("malloc");
        return -1;
    }

    res->aio_buf = malloc(BUFSIZE);
    if (!res->aio_buf) {
        perror("malloc");
        free(res);
        return -1;
    }

    res->aio_fildes = fd;
    res->aio_nbytes = BUFSIZE;
    res->aio_offset = 0;
    res->aio_lio_opcode = LIO_READ;

    *cb = res;
    return 0;
}

static inline void destroy_aiocblist(struct aiocb* cblist[], int nitems)
{
    int i;
    for (i = 0; i < nitems; ++i) {
        if (cblist[i]) {
            free((void*)(cblist[i]->aio_buf));
            free(cblist[i]);
            cblist[i] = NULL;
        }
    }
}

int main(void)
{
    int i, nitems, fd, ret = -1;
    struct aiocb* my_aiocb[NR_ITEM];

    fd = open(__FILE__, O_RDONLY);
    if (fd < 0) {
        perror("open");
        return -1;
    }

    memset(&my_aiocb, 0, sizeof(struct aiocb*) * NR_ITEM);
    for (i = 0; i < NR_ITEM; ++i) {
        if (init_aiocb(&my_aiocb[i], fd) != 0)
            goto end;
    }

    ret = lio_listio(LIO_NOWAIT, my_aiocb, NR_ITEM, NULL);
    if (ret < 0) {
        perror("aio_read");
        goto end;
    }

    nitems = NR_ITEM;
    while (nitems > 0) {
        ret = aio_suspend((const struct aiocb* const*)my_aiocb, NR_ITEM, NULL);
        if (ret != 0) {
            perror("aio_suspend");
            goto end;
        }

        for (i = 0; i < NR_ITEM; ++i) {
            if (!my_aiocb[i])
                continue;

            ret = aio_error(my_aiocb[i]);
            if (ret == EINPROGRESS)
                continue;

            if (ret == ECANCELED)
                goto next;

            if ((ret = aio_return(my_aiocb[i])) > 0)
                printf("read %d bytes.\n", ret);
            else
                perror("aio_read");

next:
            free((void*)(my_aiocb[i]->aio_buf));
            free(my_aiocb[i]);
            my_aiocb[i] = NULL;
            --nitems;
        }
    }

    ret = 0;
end:
    destroy_aiocblist(my_aiocb, NR_ITEM);
    close(fd);
    return ret;
}

其中 lio_listio() 的第一个参数的取值可能是 LIO_WAIT 或 LIO_NOWAIT,如果是 LIO_WAIT 表示等到所有请求都完成后 lio_listio() 才返回,如果是 LIO_NOWAIT 表示请求提交后立即返回;第二个参数是要等待的请求列表;第三个参数是要列表长度(是整个列表的长度,不是有效请求的个数,lio_listio() 会忽略为 NULL 的项);第四个参数是当所有请求完成后的通知方式,如果为空则没有通知。

请求完成时有两种通知方式:信号通知和回调函数。信号通知的方式是,设置在请求完成时发出一个信号,并且设置对应信号的触发函数;回调函数是当请求完成时由另一个线程调用注册的回调函数。

先来看看信号通知的方法:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <aio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>

#define BUFSIZE 8192

static void aio_completion_handler(int signo, siginfo_t* info, void* context)
{
    struct aiocb* req;

    if (info->si_signo == SIGIO) {
        int ret;
        req = (struct aiocb*)(info->si_value.sival_ptr);

        ret = aio_error(req);
        if (ret == 0) {
            ret = aio_return(req);
            printf("read %d bytes.\n", ret);
        } else if (ret != EINPROGRESS) {
            perror("aio_read");
        }
    }
}

int main(void)
{
    int fd, ret;
    struct aiocb my_aiocb;
    struct sigaction sig_act;

    fd = open(__FILE__, O_RDONLY);
    if (fd < 0) {
        perror("open");
        return -1;
    }

    memset(&my_aiocb, 0, sizeof(struct aiocb));

    my_aiocb.aio_buf = malloc(BUFSIZE);
    if (!my_aiocb.aio_buf) {
        perror("malloc");
        goto end;
    }

    my_aiocb.aio_fildes = fd;
    my_aiocb.aio_nbytes = BUFSIZE;
    my_aiocb.aio_offset = 0;
    my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL; /* 设置触发方式为信号通知 */
    my_aiocb.aio_sigevent.sigev_signo = SIGIO; /* 当操作完成时发出 SIGIO 信号 */
    my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;

    sigemptyset(&sig_act.sa_mask);
    sig_act.sa_flags = SA_SIGINFO;
    sig_act.sa_sigaction = aio_completion_handler;
    ret = sigaction(SIGIO, &sig_act, NULL); /* 当发出 SIGIO 信号时触发的函数 */
    if (ret != 0) {
        perror("sigaction");
        goto end;
    }

    ret = aio_read(&my_aiocb);
    if (ret < 0) {
        perror("aio_read");
        goto end;
    }

    while (1);

    free((void*)(my_aiocb.aio_buf));
end:
    close(fd);
    return ret;
}

这个程序和第一个程序相比多了设置 struct aiocb 中的 aio_sigevent 部分(这里发出的信号不一定得是 SIGIO),还有调用 sigaction() 设置对应信号的触发函数。

使用线程回调函数的设置和信号方式差不多:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <aio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>

#define BUFSIZE 8192

static void aio_completion_handler(union sigval sig)
{
    int ret;
    struct aiocb* req = sig.sival_ptr;

    ret = aio_error(req);
    if (ret == 0) {
        ret = aio_return(req);
        printf("read %d bytes.\n", ret);
    } else if (ret != EINPROGRESS) {
        perror("aio_read");
    }
}

int main(void)
{
    int fd, ret;
    struct aiocb my_aiocb;

    fd = open(__FILE__, O_RDONLY);
    if (fd < 0) {
        perror("open");
        return -1;
    }

    memset(&my_aiocb, 0, sizeof(struct aiocb));

    my_aiocb.aio_buf = malloc(BUFSIZE);
    if (!my_aiocb.aio_buf) {
        perror("malloc");
        goto end;
    }

    my_aiocb.aio_fildes = fd;
    my_aiocb.aio_nbytes = BUFSIZE;
    my_aiocb.aio_offset = 0;
    my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD; /* 设置触发方式为线程回调 */
    my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;
    my_aiocb.aio_sigevent.sigev_notify_function = aio_completion_handler;

    ret = aio_read(&my_aiocb);
    if (ret < 0) {
        perror("aio_read");
        goto end;
    }

    while (1);

    free((void*)(my_aiocb.aio_buf));
end:
    close(fd);
    return ret;
}

参考资料

[1] 使用异步 I/O 大大提高应用程序的性能
[2] aio(7)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注