pthread 学习笔记 (3)

读者-写者问题

一个缓冲区,有些进程只读取里面的内容,另外有的进程会修改里面的内容。为了保持数据的一致性,如果没有进程修改内容时,任意个读进程可以同时访问缓冲区,但是同一时间只能有一个写进程可以访问缓冲区,其它写进程和读进程都不能对缓冲区进行操作。

读者-写者问题和生产者-消费者问题不同的是,后者的每个线程都要修改缓冲区的内容,所以不得不使用互斥锁来保证数据一致性,而前者有些线程是只读的,多个只读线程同时访问并不会出现数据不一致的情况,所以在实现上不必为每个线程都加一个互斥锁,而是让多个读线程可以同时访问,只有写进程的访问是互斥的。

使用互斥锁实现

下面是利用 pthread_mutex 系列函数的实现。

#include <stdio.h>
#include <pthread.h>

struct pool {
   int nr_reader;
   unsigned long long value;
   pthread_mutex_t may_write, rd_count_mutex;
};

static void* writer(void* arg)
{
   struct pool* p = arg;

   while (1) {
      pthread_mutex_lock(&p->may_write);
      ++p->value;
      printf("writer: value = %llu\n", p->value);
      pthread_mutex_unlock(&p->may_write);
   }

   return NULL;
}

static void* reader(void* arg)
{
   struct pool* p = arg;

   while (1) {
      pthread_mutex_lock(&p->rd_count_mutex);
      ++p->nr_reader;
      if (p->nr_reader == 1)
         pthread_mutex_lock(&p->may_write);
      pthread_mutex_unlock(&p->rd_count_mutex);

      printf("%d reader(s), value = %llu\n", p->nr_reader, p->value);

      pthread_mutex_lock(&p->rd_count_mutex);
      --p->nr_reader;
      if (p->nr_reader == 0)
         pthread_mutex_unlock(&p->may_write);
      pthread_mutex_unlock(&p->rd_count_mutex);
   }

   return NULL;
}

static inline void pool_init(struct pool* p)
{
   p->nr_reader = 0;
   p->value = 0;
   pthread_mutex_init(&p->may_write, NULL);
   pthread_mutex_init(&p->rd_count_mutex, NULL);
}

#define NR_READER 5

int main(void)
{
   int i, status;
   struct pool pool;
   pthread_t reader_pid[NR_READER], writer_pid;

   pool_init(&pool);

   status = pthread_create(&writer_pid, NULL, writer, &pool);
   if (status != 0)
      fprintf(stderr, "create writer failed.\n");

   for (i = 0; i < NR_READER; ++i) {
      status = pthread_create(&reader_pid[i], NULL, reader, &pool);
      if (status != 0)
         fprintf(stderr, "create reader %d failed.\n", i);
   }

   pthread_join(writer_pid, NULL);

   for (i = 0; i < NR_READER; ++i)
      pthread_join(reader_pid[i], NULL);

   return 0;
}

struct pool 中的变量 may_write 是保护 value 的锁,rd_count_mutex 是 nr_reader 的锁。

在 reader 线程中,如果是第一个读者获得 pool 的访问权,则对 may_write 加锁,阻止其它写线程访问。使用完 nr_reader 后及时对 mutex 解锁,这样其它读线程就可以同时访问 value。在读线程访问 value 期间如果有其它读线程要访问 value 则不受影响,只需将 nr_reader 加 1 即可。在读线程中直到所有线程退出访问时才会解锁 may_write,之后的写线程才能访问。

使用读写锁实现

pthread 还提供了另一种锁——读写锁来方便这种既有读又有写的问题的处理。读写锁的好处是用户不用自己维护读者的计数,还有减少进出函数的切换。

#include <stdio.h>
#include <pthread.h>

struct pool {
   unsigned long long value;
   pthread_rwlock_t rwlock;
};

static void* writer(void* arg)
{
   struct pool* p = arg;

   while (1) {
      pthread_rwlock_wrlock(&p->rwlock);
      ++p->value;
      printf("writer: value = %llu\n", p->value);
      pthread_rwlock_unlock(&p->rwlock);
   }

   return NULL;
}

static void* reader(void* arg)
{
   struct pool* p = arg;

   while (1) {
      pthread_rwlock_rdlock(&p->rwlock);
      printf("reader: value = %llu\n", p->value);
      pthread_rwlock_unlock(&p->rwlock);
   }

   return NULL;
}

static inline void pool_init(struct pool* p)
{
   p->value = 0;
   pthread_rwlock_init(&p->rwlock, NULL);
}

#define NR_READER 5

int main(void)
{
   int i, status;
   struct pool pool;
   pthread_t reader_pid[NR_READER], writer_pid;

   pool_init(&pool);

   status = pthread_create(&writer_pid, NULL, writer, &pool);
   if (status != 0)
      fprintf(stderr, "create writer failed.\n");

   for (i = 0; i < NR_READER; ++i) {
      status = pthread_create(&reader_pid[i], NULL, reader, &pool);
      if (status != 0)
         fprintf(stderr, "create reader %d failed.\n", i);
   }

   pthread_join(writer_pid, NULL);

   for (i = 0; i < NR_READER; ++i)
      pthread_join(reader_pid[i], NULL);

   return 0;
}

类似 mutex,rwlock 的构造和析构函数分别为

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
      const pthread_rwlockattr_t *restrict attr);

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

第一个参数是读写锁指针。第二个参数是锁的属性设置,如果为 NULL 则使用默认设置。

读线程获取读写锁的函数是

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

如果有写线程占有锁时读线程会阻塞,直到获取为止。

写线程获取锁的函数是

int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

如果有读线程或写线程占有锁时同样会阻塞。如果希望不阻塞在函数中则使用

int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

如果获取成功则返回 0,否则返回错误信息。

使用完后释放锁使用函数

int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

无论读线程还是写线程都是用这个函数。(疑问:怎么识别读线程和写线程呢?)

读者优先还是写者优先

上面的程序实现的都是读者优先的原则,即如果有读线程正在使用缓冲区,那么后续到来的读线程可以不必等待,直接访问缓冲区,写线程则需等待全部读线程完成后才能访问。如果不停地有读线程到达,那么写线程可能永远也不能访问缓冲区。而写者优先的规则是,当读线程正在访问缓冲区时有写线程到达,那么后续的读线程都必须等待,直到写线程修改完缓冲区后才能进入,而不能像读者优先那样直接访问缓冲区。

pthread 中提供的 rwlock 是读者优先的。下面的程序提升了写线程的优先级,但是还没完全做到写者优先,这与线程的调度算法有关。

#include <stdio.h>
#include <pthread.h>

struct pool {
   int nr_reader, nr_writer;
   unsigned long long value;
   pthread_mutex_t may_read, may_write, wr_count_mutex, rd_count_mutex;
};

static void* writer(void* arg)
{
   struct pool* p = arg;

   while (1) {
      pthread_mutex_lock(&p->wr_count_mutex);
      ++p->nr_writer;
      if (p->nr_writer == 1)
         pthread_mutex_lock(&p->may_read);
      pthread_mutex_unlock(&p->wr_count_mutex);

      pthread_mutex_lock(&p->may_write);
      ++p->value;
      printf("writer: value = %llu\n", p->value);
      pthread_mutex_unlock(&p->may_write);

      pthread_mutex_lock(&p->wr_count_mutex);
      --p->nr_writer;
      if (p->nr_writer == 0)
         pthread_mutex_unlock(&p->may_read);
      pthread_mutex_unlock(&p->wr_count_mutex);
   }

   return NULL;
}

static void* reader(void* arg)
{
   struct pool* p = arg;

   while (1) {
      pthread_mutex_lock(&p->may_read);

      pthread_mutex_lock(&p->rd_count_mutex);
      ++p->nr_reader;
      if (p->nr_reader == 1)
         pthread_mutex_lock(&p->may_write);
      pthread_mutex_unlock(&p->rd_count_mutex);

      pthread_mutex_unlock(&p->may_read);

      printf("%d reader(s), value = %llu\n", p->nr_reader, p->value);

      pthread_mutex_lock(&p->rd_count_mutex);
      --p->nr_reader;
      if (p->nr_reader == 0)
         pthread_mutex_unlock(&p->may_write);
      pthread_mutex_unlock(&p->rd_count_mutex);
   }

   return NULL;
}

static inline void pool_init(struct pool* p)
{
   p->nr_reader = 0;
   p->value = 0;
   pthread_mutex_init(&p->may_read, NULL);
   pthread_mutex_init(&p->may_write, NULL);
   pthread_mutex_init(&p->rd_count_mutex, NULL);
   pthread_mutex_init(&p->wr_count_mutex, NULL);
}

#define NR_READER 5
#define NR_WRITER 5

int main(void)
{
   int i, status;
   struct pool pool;
   pthread_t reader_pid[NR_READER], writer_pid[NR_WRITER];

   pool_init(&pool);

   for (i = 0; i < NR_WRITER; ++i) {
      status = pthread_create(&writer_pid[i], NULL, writer, &pool);
      if (status != 0)
         fprintf(stderr, "create writer %d failed.\n", i);
   }

   for (i = 0; i < NR_READER; ++i) {
      status = pthread_create(&reader_pid[i], NULL, reader, &pool);
      if (status != 0)
         fprintf(stderr, "create reader %d failed.\n", i);
   }

   for (i = 0; i < NR_WRITER; ++i)
      pthread_join(writer_pid[i], NULL);

   for (i = 0; i < NR_READER; ++i)
      pthread_join(reader_pid[i], NULL);

   return 0;
}

写线程通过一个互斥锁 may_read 来阻止读线程,只要有一个写线程等待,读线程都拿不到 may_read,从而无法读取。但是如果调度的时候读线程有较高的优先级,这也就成了读者优先。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注