pthread 学习笔记 (2)

互斥和同步

互斥(mutual exclusion,缩写mutex)是指一段区域在同一时间内只能有一个线程对其进行操作,否则会造成不一致的情况,这段区域叫做临界区。互斥只要求同一时间内只能有一个线程进行访问,但是线程之间的访问顺序可以是任意的;同步要求线程之间的访问有一定的顺序,并且一般都要求线程之间互斥访问(如果不修改临界区的值的话可以允许多个只读线程同时访问)。

生产者-消费者问题

同步和互斥的一个经典例子是生产者-消费者问题。假设一个缓冲区的大小为 N,如果缓冲区还没满,生产者每次可以往缓冲区里放入一个物品;如果缓冲区非空,消费者每次可以从缓冲区里取出一个物品。缓冲区是生产者和消费者共用的,同一时间内只能有一个线程可以对缓冲区进行修改,否则可能会出现错误。为了保证对缓冲区修改的原子性(即访问过程中不能被别的线程打断),可以对缓冲区加一个互斥锁(mutex lock)。当线程要访问缓冲区时需要先检查锁是否被其它线程占用,如果是的话就必须等待正在访问的线程释放;然后它占用互斥锁,防止别的线程在自己修改的过程中访问缓冲区;访问完毕后释放锁,让其它线程可以进行访问。

只有一个生产者和一个消费者

#include <stdio.h>
#include <pthread.h>

struct pool {
   pthread_mutex_t mutex;
#define MAX_NUM 5
   int num;
};

/* mutex should be held by the caller when one of the following four inline functions is called */

static inline int is_full(struct pool* pool)
{
   return (pool->num == MAX_NUM);
}

static inline int is_empty(struct pool* pool)
{
   return (pool->num == 0);
}

static inline void put(struct pool* pool)
{
   ++pool->num;
}

static inline void get(struct pool* pool)
{
   --pool->num;
}

void init(struct pool* pool)
{
   pool->num = 0;
   pthread_mutex_init(&pool->mutex, NULL);
}

static void* consumer(void* arg)
{
   struct pool* pool = (struct pool*)arg;

   while (1) {
      pthread_mutex_lock(&pool->mutex);
      if (is_empty(pool))
         printf("get fail.\n");
      else {
         get(pool);
         printf("get successfully. available: %d\n", pool->num);
      }
      pthread_mutex_unlock(&pool->mutex);
   }

   return arg;
}

static void* producer(void* arg)
{
   struct pool* pool = (struct pool*)arg;

   while (1) {
      pthread_mutex_lock(&pool->mutex);
      if (is_full(pool))
         printf("put fail.\n");
      else {
         put(pool);
         printf("put successfully. available: %d\n", pool->num);
      }
      pthread_mutex_unlock(&pool->mutex);
   }

   return arg;
}

int main()
{
   struct pool pool;
   pthread_t producer_id, consumer_id;

   init(&pool);

   pthread_create(&producer_id, NULL, producer, &pool);
   pthread_create(&consumer_id, NULL, consumer, &pool);

   pthread_join(producer_id, NULL);
   pthread_join(consumer_id, NULL);

   return 0;
}

struct pool 有两个成员:num 用来记录当前个数,mutex 用来保证对 num 的互斥访问。在主程序开始时先对 mutex 初始化。初始化函数:

int pthread_mutex_init(pthread_mutex_t *restrict mutex,
      const pthread_mutexattr_t *restrict attr);

第一个参数是要初始化的变量,第二个参数是 mutex 的属性,如果为 NULL 则使用默认的属性。也可以使用

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

来代替默认属性的初始化。使用完 mutex 变量后可以调用函数

int pthread_mutex_destroy(pthread_mutex_t *mutex);

清理,此时 mutex 的状态是不确定的。如果想重新使用 mutex 则要再次执行 pthread_mutex_init()。

主进程创建了两个线程,分别是生产者和消费者,两个线程内都是死循环。每当它们对临界区进行访问时都必须对 mutex 加锁,防止自己在修改 num 的值时另一个线程来读取或修改 num 的值。加锁的函数是:

int pthread_mutex_lock(pthread_mutex_t *mutex);

每个线程想访问 num 的值时都必须先对 mutex 加锁。如果 mutex 已经被锁住,线程会阻塞直到其它线程对 mutex 解锁并且轮到自己为 mutex 加锁,从而继续执行临界区的代码。如果不想线程阻塞使用另外一个函数:

int pthread_mutex_trylock(pthread_mutex_t *mutex);

pthread_mutex_trylock() 会尝试对 mutex 加锁,如果成功返回 0,如果不能加锁会立即返回一个错误值而不会阻塞。线程中应该根据返回值进行不同的操作。

当结束对临界区的访问后为了可以让其它线程进行访问,要对 mutex 解锁:

int pthread_mutex_unlock(pthread_mutex_t *mutex);

一个生产者和多个消费者

消费者的工作流程:mutex_lock --> 检查是否有可用的物品 --> 使用物品 --> mutex_unlock。当只有一个生产者和多个消费者时,由于生产者效率比消费者的低,所以大多数情况下消费者做的都是无用功(没有物品可用),并且还降低了生产者的效率(消费者和生产者争夺 mutex)。由于生产者生产了物品消费者才能使用,当没有物品可用时可以让消费者进入睡眠状态,生产者生产出了物品后发出一个信号,唤醒消费者进行消费。

#include <stdio.h>
#include <pthread.h>

struct pool {
   pthread_mutex_t mutex;
   pthread_cond_t cond;
#define MAX_NUM 5
   int num;
};

/* mutex should be held by the caller when one of the following four inline functions is called */

static inline int is_full(struct pool* pool)
{
   return (pool->num == MAX_NUM);
}

static inline int is_empty(struct pool* pool)
{
   return (pool->num == 0);
}

static inline void put(struct pool* pool)
{
   ++pool->num;
}

static inline void get(struct pool* pool)
{
   --pool->num;
}

void init(struct pool* pool)
{
   pool->num = 0;
   pthread_mutex_init(&pool->mutex, NULL);
   pthread_cond_init(&pool->cond, NULL);
}

struct consumer_arg {
   struct pool* pool;
   int tid;
};

static void* consumer(void* arg)
{
   struct consumer_arg* carg = (struct consumer_arg*)arg;
   struct pool* pool = carg->pool;

   while (1) {
      pthread_mutex_lock(&pool->mutex);

      while (is_empty(pool))
         pthread_cond_wait(&pool->cond, &pool->mutex);

      get(pool);
      printf("tid %d get. available: %d\n", carg->tid, pool->num);

      pthread_mutex_unlock(&pool->mutex);
   }

   return arg;
}

static void* producer(void* arg)
{
   struct pool* pool = (struct pool*)arg;

   while (1) {
      pthread_mutex_lock(&pool->mutex);
      if (is_full(pool))
         printf("put fail.\n");
      else {
         put(pool);
         printf("put successfully. available: %d\n", pool->num);
      }
      pthread_cond_signal(&pool->cond);
      pthread_mutex_unlock(&pool->mutex);
   }

   return arg;
}

#define CONSUMER_NR 5

int main()
{
   int i;
   struct pool pool;
   struct consumer_arg carg[CONSUMER_NR];
   pthread_t producer_pid;
   pthread_t consumer_pid[CONSUMER_NR];

   init(&pool);

   pthread_create(&producer_pid, NULL, producer, &pool);

   for (i = 0; i < CONSUMER_NR; ++i) {
      carg[i].pool = &pool;
      carg[i].tid = i;
      pthread_create(&consumer_pid[i], NULL, consumer, &carg[i]);
   }

   pthread_join(producer_pid, NULL);
   for (i = 0; i < CONSUMER_NR; ++i)
      pthread_join(consumer_pid[i], NULL);

   return 0;
}

struct pool 结构里多了一个 pthread_cond_t 变量,用来实现信号相关的操作。初始化使用函数

int pthread_cond_init(pthread_cond_t *restrict cond,
      const pthread_condattr_t *restrict attr);

参数意义和 pthread_mutex_init() 相似,同样地也可以使用

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

静态初始化。变量用完后可以使用

int pthread_cond_destroy(pthread_cond_t *cond);

清理变量。

生产者每次往缓冲区里放入一个物品后发出一个信号通知消费者:

int pthread_cond_signal(pthread_cond_t *cond);

pthread_cond_signal() 发出一个信号,唤醒 至少 一个正在等待的消费者。如果需要通知所有等待的消费者,可以使用函数

int pthread_cond_broadcast(pthread_cond_t *cond);

消费者通过函数

int pthread_cond_wait(pthread_cond_t *restrict cond,
      pthread_mutex_t *restrict mutex);

等待生产者的信号。第一个参数和生产者使用的是同一个变量,第二个参数是保护缓冲区的 mutex 变量。如果没有收到生产者的信号,消费者会一直阻塞在 pthread_cond_wait() 中,直到生产者发出信号后根据调度策略等待被唤醒进行消费。

这里比较特别的是在消费者的函数里,在while(1) 循环中 pthread_mutex_lock() 函数后用了一个循环来判断缓冲区中是否有可用的物品:

      while (is_empty(pool))
         pthread_cond_wait(&pool->cond, &pool->mutex);

pthread_cond_wait() 函数实际上做了下面 3 件事情(参考资料 [1]):

  • pthread_mutex_unlock(&pool->mutex):因为进入了 pthread_cond_wait() 说明了当前的消费者线程获得了 mutex 并且缓冲区中没有可用的物品,所以要先对 mutex 进行解锁以便让生产者可以获得 mutex 向缓冲区中添加物品;
  • 等待生产者的信号;
  • pthread_mutex_lock(&pool->mutex)。

假设生产者进入 pthread_cond_signal(),消费者 1 刚进入 pthread_cond_wait(),在等待队列里还有若干个等待的消费者,下面是 pthread_cond_wait() 和 ptrhead_cond_signal() 的一个可能实现,注释是执行序列(参考资料 [2]):

pthread_cond_wait(mutex, cond):           /* consumer 1 */  /* consumer 2 */
    value = cond->value;                     /* 1 */
    pthread_mutex_unlock(mutex);             /* 2 */
    pthread_mutex_lock(cond->mutex);         /* 14 */          /* 10 */
    if (value == cond->value) {              /* 15 */          /* 11 */
        me->next_cond = cond->waiter;
        cond->waiter = me;
        pthread_mutex_unlock(cond->mutex);
        unable_to_run(me);
    } else
        pthread_mutex_unlock(cond->mutex);   /* 16 */          /* 12 */
    pthread_mutex_lock(mutex);               /* 17 */          /* 13 */

pthread_cond_signal(cond):                /* producer */
    pthread_mutex_lock(cond->mutex);         /* 3 */    
    cond->value++;                           /* 4 */    
    if (cond->waiter) {                      /* 5 */    
        sleeper = cond->waiter;              /* 6 */    
        cond->waiter = sleeper->next_cond;   /* 7 */    
        able_to_run(sleeper);                /* 8 */    
    }
    pthread_mutex_unlock(cond->mutex);       /* 9 */    

消费者 1 执行完第 2 步后被挂起,转而执行生产者部分(3-9 步)。第 8 步唤醒了一个正在在等待的消费者 2,生产者执行完第 9 步后执行刚被唤醒的消费者 2,消费者 2 获得了缓冲区的使用权(10-13 步),使用完缓冲区后对 mutex 解锁(pthread_cond_wait() 函数外),才回到消费者 1 执行(14-17 步)。但是这时缓冲区里的物品已经被消费者 2 消耗了,当消费者 1 退出 pthread_cond_wait() 后必须再次检查缓冲区的状态,否则就可能出错。所以在消费者中使用了 while 循环判断缓冲区的状态,这也是为什么 pthread_cond_signal() 可能唤醒不止一个消费者的原因。

参考资料

[1] Michael Kerrisk. The Linux Programming Interface.
[2] pthread_cond_signal(3) - Linux man page

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注