读者-写者问题
一个缓冲区,有些进程只读取里面的内容,另外有的进程会修改里面的内容。为了保持数据的一致性,如果没有进程修改内容时,任意个读进程可以同时访问缓冲区,但是同一时间只能有一个写进程可以访问缓冲区,其它写进程和读进程都不能对缓冲区进行操作。
读者-写者问题和生产者-消费者问题不同的是,后者的每个线程都要修改缓冲区的内容,所以不得不使用互斥锁来保证数据一致性,而前者有些线程是只读的,多个只读线程同时访问并不会出现数据不一致的情况,所以在实现上不必为每个线程都加一个互斥锁,而是让多个读线程可以同时访问,只有写进程的访问是互斥的。
使用互斥锁实现
下面是利用 pthread_mutex 系列函数的实现。
#include <stdio.h>
#include <pthread.h>
struct pool {
int nr_reader;
unsigned long long value;
pthread_mutex_t may_write, rd_count_mutex;
};
static void* writer(void* arg)
{
struct pool* p = arg;
while (1) {
pthread_mutex_lock(&p->may_write);
++p->value;
printf("writer: value = %llu\n", p->value);
pthread_mutex_unlock(&p->may_write);
}
return NULL;
}
static void* reader(void* arg)
{
struct pool* p = arg;
while (1) {
pthread_mutex_lock(&p->rd_count_mutex);
++p->nr_reader;
if (p->nr_reader == 1)
pthread_mutex_lock(&p->may_write);
pthread_mutex_unlock(&p->rd_count_mutex);
printf("%d reader(s), value = %llu\n", p->nr_reader, p->value);
pthread_mutex_lock(&p->rd_count_mutex);
--p->nr_reader;
if (p->nr_reader == 0)
pthread_mutex_unlock(&p->may_write);
pthread_mutex_unlock(&p->rd_count_mutex);
}
return NULL;
}
static inline void pool_init(struct pool* p)
{
p->nr_reader = 0;
p->value = 0;
pthread_mutex_init(&p->may_write, NULL);
pthread_mutex_init(&p->rd_count_mutex, NULL);
}
#define NR_READER 5
int main(void)
{
int i, status;
struct pool pool;
pthread_t reader_pid[NR_READER], writer_pid;
pool_init(&pool);
status = pthread_create(&writer_pid, NULL, writer, &pool);
if (status != 0)
fprintf(stderr, "create writer failed.\n");
for (i = 0; i < NR_READER; ++i) {
status = pthread_create(&reader_pid[i], NULL, reader, &pool);
if (status != 0)
fprintf(stderr, "create reader %d failed.\n", i);
}
pthread_join(writer_pid, NULL);
for (i = 0; i < NR_READER; ++i)
pthread_join(reader_pid[i], NULL);
return 0;
}
struct pool 中的变量 may_write 是保护 value 的锁,rd_count_mutex 是 nr_reader 的锁。
在 reader 线程中,如果是第一个读者获得 pool 的访问权,则对 may_write 加锁,阻止其它写线程访问。使用完 nr_reader 后及时对 mutex 解锁,这样其它读线程就可以同时访问 value。在读线程访问 value 期间如果有其它读线程要访问 value 则不受影响,只需将 nr_reader 加 1 即可。在读线程中直到所有线程退出访问时才会解锁 may_write,之后的写线程才能访问。
使用读写锁实现
pthread 还提供了另一种锁——读写锁来方便这种既有读又有写的问题的处理。读写锁的好处是用户不用自己维护读者的计数,还有减少进出函数的切换。
#include <stdio.h>
#include <pthread.h>
struct pool {
unsigned long long value;
pthread_rwlock_t rwlock;
};
static void* writer(void* arg)
{
struct pool* p = arg;
while (1) {
pthread_rwlock_wrlock(&p->rwlock);
++p->value;
printf("writer: value = %llu\n", p->value);
pthread_rwlock_unlock(&p->rwlock);
}
return NULL;
}
static void* reader(void* arg)
{
struct pool* p = arg;
while (1) {
pthread_rwlock_rdlock(&p->rwlock);
printf("reader: value = %llu\n", p->value);
pthread_rwlock_unlock(&p->rwlock);
}
return NULL;
}
static inline void pool_init(struct pool* p)
{
p->value = 0;
pthread_rwlock_init(&p->rwlock, NULL);
}
#define NR_READER 5
int main(void)
{
int i, status;
struct pool pool;
pthread_t reader_pid[NR_READER], writer_pid;
pool_init(&pool);
status = pthread_create(&writer_pid, NULL, writer, &pool);
if (status != 0)
fprintf(stderr, "create writer failed.\n");
for (i = 0; i < NR_READER; ++i) {
status = pthread_create(&reader_pid[i], NULL, reader, &pool);
if (status != 0)
fprintf(stderr, "create reader %d failed.\n", i);
}
pthread_join(writer_pid, NULL);
for (i = 0; i < NR_READER; ++i)
pthread_join(reader_pid[i], NULL);
return 0;
}
类似 mutex,rwlock 的构造和析构函数分别为
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
第一个参数是读写锁指针。第二个参数是锁的属性设置,如果为 NULL 则使用默认设置。
读线程获取读写锁的函数是
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
如果有写线程占有锁时读线程会阻塞,直到获取为止。
写线程获取锁的函数是
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
如果有读线程或写线程占有锁时同样会阻塞。如果希望不阻塞在函数中则使用
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
如果获取成功则返回 0,否则返回错误信息。
使用完后释放锁使用函数
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
无论读线程还是写线程都是用这个函数。(疑问:怎么识别读线程和写线程呢?)
读者优先还是写者优先
上面的程序实现的都是读者优先的原则,即如果有读线程正在使用缓冲区,那么后续到来的读线程可以不必等待,直接访问缓冲区,写线程则需等待全部读线程完成后才能访问。如果不停地有读线程到达,那么写线程可能永远也不能访问缓冲区。而写者优先的规则是,当读线程正在访问缓冲区时有写线程到达,那么后续的读线程都必须等待,直到写线程修改完缓冲区后才能进入,而不能像读者优先那样直接访问缓冲区。
pthread 中提供的 rwlock 是读者优先的。下面的程序提升了写线程的优先级,但是还没完全做到写者优先,这与线程的调度算法有关。
#include <stdio.h>
#include <pthread.h>
struct pool {
int nr_reader, nr_writer;
unsigned long long value;
pthread_mutex_t may_read, may_write, wr_count_mutex, rd_count_mutex;
};
static void* writer(void* arg)
{
struct pool* p = arg;
while (1) {
pthread_mutex_lock(&p->wr_count_mutex);
++p->nr_writer;
if (p->nr_writer == 1)
pthread_mutex_lock(&p->may_read);
pthread_mutex_unlock(&p->wr_count_mutex);
pthread_mutex_lock(&p->may_write);
++p->value;
printf("writer: value = %llu\n", p->value);
pthread_mutex_unlock(&p->may_write);
pthread_mutex_lock(&p->wr_count_mutex);
--p->nr_writer;
if (p->nr_writer == 0)
pthread_mutex_unlock(&p->may_read);
pthread_mutex_unlock(&p->wr_count_mutex);
}
return NULL;
}
static void* reader(void* arg)
{
struct pool* p = arg;
while (1) {
pthread_mutex_lock(&p->may_read);
pthread_mutex_lock(&p->rd_count_mutex);
++p->nr_reader;
if (p->nr_reader == 1)
pthread_mutex_lock(&p->may_write);
pthread_mutex_unlock(&p->rd_count_mutex);
pthread_mutex_unlock(&p->may_read);
printf("%d reader(s), value = %llu\n", p->nr_reader, p->value);
pthread_mutex_lock(&p->rd_count_mutex);
--p->nr_reader;
if (p->nr_reader == 0)
pthread_mutex_unlock(&p->may_write);
pthread_mutex_unlock(&p->rd_count_mutex);
}
return NULL;
}
static inline void pool_init(struct pool* p)
{
p->nr_reader = 0;
p->value = 0;
pthread_mutex_init(&p->may_read, NULL);
pthread_mutex_init(&p->may_write, NULL);
pthread_mutex_init(&p->rd_count_mutex, NULL);
pthread_mutex_init(&p->wr_count_mutex, NULL);
}
#define NR_READER 5
#define NR_WRITER 5
int main(void)
{
int i, status;
struct pool pool;
pthread_t reader_pid[NR_READER], writer_pid[NR_WRITER];
pool_init(&pool);
for (i = 0; i < NR_WRITER; ++i) {
status = pthread_create(&writer_pid[i], NULL, writer, &pool);
if (status != 0)
fprintf(stderr, "create writer %d failed.\n", i);
}
for (i = 0; i < NR_READER; ++i) {
status = pthread_create(&reader_pid[i], NULL, reader, &pool);
if (status != 0)
fprintf(stderr, "create reader %d failed.\n", i);
}
for (i = 0; i < NR_WRITER; ++i)
pthread_join(writer_pid[i], NULL);
for (i = 0; i < NR_READER; ++i)
pthread_join(reader_pid[i], NULL);
return 0;
}
写线程通过一个互斥锁 may_read 来阻止读线程,只要有一个写线程等待,读线程都拿不到 may_read,从而无法读取。但是如果调度的时候读线程有较高的优先级,这也就成了读者优先。