互斥和同步
互斥(mutual exclusion,缩写mutex)是指一段区域在同一时间内只能有一个线程对其进行操作,否则会造成不一致的情况,这段区域叫做临界区。互斥只要求同一时间内只能有一个线程进行访问,但是线程之间的访问顺序可以是任意的;同步要求线程之间的访问有一定的顺序,并且一般都要求线程之间互斥访问(如果不修改临界区的值的话可以允许多个只读线程同时访问)。
生产者-消费者问题
同步和互斥的一个经典例子是生产者-消费者问题。假设一个缓冲区的大小为 N,如果缓冲区还没满,生产者每次可以往缓冲区里放入一个物品;如果缓冲区非空,消费者每次可以从缓冲区里取出一个物品。缓冲区是生产者和消费者共用的,同一时间内只能有一个线程可以对缓冲区进行修改,否则可能会出现错误。为了保证对缓冲区修改的原子性(即访问过程中不能被别的线程打断),可以对缓冲区加一个互斥锁(mutex lock)。当线程要访问缓冲区时需要先检查锁是否被其它线程占用,如果是的话就必须等待正在访问的线程释放;然后它占用互斥锁,防止别的线程在自己修改的过程中访问缓冲区;访问完毕后释放锁,让其它线程可以进行访问。
只有一个生产者和一个消费者
#include <stdio.h>
#include <pthread.h>
struct pool {
pthread_mutex_t mutex;
#define MAX_NUM 5
int num;
};
/* mutex should be held by the caller when one of the following four inline functions is called */
static inline int is_full(struct pool* pool)
{
return (pool->num == MAX_NUM);
}
static inline int is_empty(struct pool* pool)
{
return (pool->num == 0);
}
static inline void put(struct pool* pool)
{
++pool->num;
}
static inline void get(struct pool* pool)
{
--pool->num;
}
void init(struct pool* pool)
{
pool->num = 0;
pthread_mutex_init(&pool->mutex, NULL);
}
static void* consumer(void* arg)
{
struct pool* pool = (struct pool*)arg;
while (1) {
pthread_mutex_lock(&pool->mutex);
if (is_empty(pool))
printf("get fail.\n");
else {
get(pool);
printf("get successfully. available: %d\n", pool->num);
}
pthread_mutex_unlock(&pool->mutex);
}
return arg;
}
static void* producer(void* arg)
{
struct pool* pool = (struct pool*)arg;
while (1) {
pthread_mutex_lock(&pool->mutex);
if (is_full(pool))
printf("put fail.\n");
else {
put(pool);
printf("put successfully. available: %d\n", pool->num);
}
pthread_mutex_unlock(&pool->mutex);
}
return arg;
}
int main()
{
struct pool pool;
pthread_t producer_id, consumer_id;
init(&pool);
pthread_create(&producer_id, NULL, producer, &pool);
pthread_create(&consumer_id, NULL, consumer, &pool);
pthread_join(producer_id, NULL);
pthread_join(consumer_id, NULL);
return 0;
}
struct pool 有两个成员:num 用来记录当前个数,mutex 用来保证对 num 的互斥访问。在主程序开始时先对 mutex 初始化。初始化函数:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
第一个参数是要初始化的变量,第二个参数是 mutex 的属性,如果为 NULL 则使用默认的属性。也可以使用
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
来代替默认属性的初始化。使用完 mutex 变量后可以调用函数
int pthread_mutex_destroy(pthread_mutex_t *mutex);
清理,此时 mutex 的状态是不确定的。如果想重新使用 mutex 则要再次执行 pthread_mutex_init()。
主进程创建了两个线程,分别是生产者和消费者,两个线程内都是死循环。每当它们对临界区进行访问时都必须对 mutex 加锁,防止自己在修改 num 的值时另一个线程来读取或修改 num 的值。加锁的函数是:
int pthread_mutex_lock(pthread_mutex_t *mutex);
每个线程想访问 num 的值时都必须先对 mutex 加锁。如果 mutex 已经被锁住,线程会阻塞直到其它线程对 mutex 解锁并且轮到自己为 mutex 加锁,从而继续执行临界区的代码。如果不想线程阻塞使用另外一个函数:
int pthread_mutex_trylock(pthread_mutex_t *mutex);
pthread_mutex_trylock() 会尝试对 mutex 加锁,如果成功返回 0,如果不能加锁会立即返回一个错误值而不会阻塞。线程中应该根据返回值进行不同的操作。
当结束对临界区的访问后为了可以让其它线程进行访问,要对 mutex 解锁:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
一个生产者和多个消费者
消费者的工作流程:mutex_lock --> 检查是否有可用的物品 --> 使用物品 --> mutex_unlock。当只有一个生产者和多个消费者时,由于生产者效率比消费者的低,所以大多数情况下消费者做的都是无用功(没有物品可用),并且还降低了生产者的效率(消费者和生产者争夺 mutex)。由于生产者生产了物品消费者才能使用,当没有物品可用时可以让消费者进入睡眠状态,生产者生产出了物品后发出一个信号,唤醒消费者进行消费。
#include <stdio.h>
#include <pthread.h>
struct pool {
pthread_mutex_t mutex;
pthread_cond_t cond;
#define MAX_NUM 5
int num;
};
/* mutex should be held by the caller when one of the following four inline functions is called */
static inline int is_full(struct pool* pool)
{
return (pool->num == MAX_NUM);
}
static inline int is_empty(struct pool* pool)
{
return (pool->num == 0);
}
static inline void put(struct pool* pool)
{
++pool->num;
}
static inline void get(struct pool* pool)
{
--pool->num;
}
void init(struct pool* pool)
{
pool->num = 0;
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond, NULL);
}
struct consumer_arg {
struct pool* pool;
int tid;
};
static void* consumer(void* arg)
{
struct consumer_arg* carg = (struct consumer_arg*)arg;
struct pool* pool = carg->pool;
while (1) {
pthread_mutex_lock(&pool->mutex);
while (is_empty(pool))
pthread_cond_wait(&pool->cond, &pool->mutex);
get(pool);
printf("tid %d get. available: %d\n", carg->tid, pool->num);
pthread_mutex_unlock(&pool->mutex);
}
return arg;
}
static void* producer(void* arg)
{
struct pool* pool = (struct pool*)arg;
while (1) {
pthread_mutex_lock(&pool->mutex);
if (is_full(pool))
printf("put fail.\n");
else {
put(pool);
printf("put successfully. available: %d\n", pool->num);
}
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
}
return arg;
}
#define CONSUMER_NR 5
int main()
{
int i;
struct pool pool;
struct consumer_arg carg[CONSUMER_NR];
pthread_t producer_pid;
pthread_t consumer_pid[CONSUMER_NR];
init(&pool);
pthread_create(&producer_pid, NULL, producer, &pool);
for (i = 0; i < CONSUMER_NR; ++i) {
carg[i].pool = &pool;
carg[i].tid = i;
pthread_create(&consumer_pid[i], NULL, consumer, &carg[i]);
}
pthread_join(producer_pid, NULL);
for (i = 0; i < CONSUMER_NR; ++i)
pthread_join(consumer_pid[i], NULL);
return 0;
}
struct pool 结构里多了一个 pthread_cond_t 变量,用来实现信号相关的操作。初始化使用函数
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
参数意义和 pthread_mutex_init() 相似,同样地也可以使用
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
静态初始化。变量用完后可以使用
int pthread_cond_destroy(pthread_cond_t *cond);
清理变量。
生产者每次往缓冲区里放入一个物品后发出一个信号通知消费者:
int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_signal() 发出一个信号,唤醒 至少 一个正在等待的消费者。如果需要通知所有等待的消费者,可以使用函数
int pthread_cond_broadcast(pthread_cond_t *cond);
消费者通过函数
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
等待生产者的信号。第一个参数和生产者使用的是同一个变量,第二个参数是保护缓冲区的 mutex 变量。如果没有收到生产者的信号,消费者会一直阻塞在 pthread_cond_wait() 中,直到生产者发出信号后根据调度策略等待被唤醒进行消费。
这里比较特别的是在消费者的函数里,在while(1) 循环中 pthread_mutex_lock() 函数后用了一个循环来判断缓冲区中是否有可用的物品:
while (is_empty(pool))
pthread_cond_wait(&pool->cond, &pool->mutex);
pthread_cond_wait() 函数实际上做了下面 3 件事情(参考资料 [1]):
- pthread_mutex_unlock(&pool->mutex):因为进入了 pthread_cond_wait() 说明了当前的消费者线程获得了 mutex 并且缓冲区中没有可用的物品,所以要先对 mutex 进行解锁以便让生产者可以获得 mutex 向缓冲区中添加物品;
- 等待生产者的信号;
- pthread_mutex_lock(&pool->mutex)。
假设生产者进入 pthread_cond_signal(),消费者 1 刚进入 pthread_cond_wait(),在等待队列里还有若干个等待的消费者,下面是 pthread_cond_wait() 和 ptrhead_cond_signal() 的一个可能实现,注释是执行序列(参考资料 [2]):
pthread_cond_wait(mutex, cond): /* consumer 1 */ /* consumer 2 */
value = cond->value; /* 1 */
pthread_mutex_unlock(mutex); /* 2 */
pthread_mutex_lock(cond->mutex); /* 14 */ /* 10 */
if (value == cond->value) { /* 15 */ /* 11 */
me->next_cond = cond->waiter;
cond->waiter = me;
pthread_mutex_unlock(cond->mutex);
unable_to_run(me);
} else
pthread_mutex_unlock(cond->mutex); /* 16 */ /* 12 */
pthread_mutex_lock(mutex); /* 17 */ /* 13 */
pthread_cond_signal(cond): /* producer */
pthread_mutex_lock(cond->mutex); /* 3 */
cond->value++; /* 4 */
if (cond->waiter) { /* 5 */
sleeper = cond->waiter; /* 6 */
cond->waiter = sleeper->next_cond; /* 7 */
able_to_run(sleeper); /* 8 */
}
pthread_mutex_unlock(cond->mutex); /* 9 */
消费者 1 执行完第 2 步后被挂起,转而执行生产者部分(3-9 步)。第 8 步唤醒了一个正在在等待的消费者 2,生产者执行完第 9 步后执行刚被唤醒的消费者 2,消费者 2 获得了缓冲区的使用权(10-13 步),使用完缓冲区后对 mutex 解锁(pthread_cond_wait() 函数外),才回到消费者 1 执行(14-17 步)。但是这时缓冲区里的物品已经被消费者 2 消耗了,当消费者 1 退出 pthread_cond_wait() 后必须再次检查缓冲区的状态,否则就可能出错。所以在消费者中使用了 while 循环判断缓冲区的状态,这也是为什么 pthread_cond_signal() 可能唤醒不止一个消费者的原因。
参考资料
[1] Michael Kerrisk. The Linux Programming Interface.
[2] pthread_cond_signal(3) - Linux man page