偶然看到 ticket spinlock 这个词,搜了一下相关的介绍,发现早在 1991 年就出现了(参考资料 1),并且还有根据论文作者命名的改进版 MCS spinlock。还找到了一篇由浅入深的介绍(参考资料 2)。本文基本是按照参考资料 2 的思路来写的。
自旋锁
顾名思义,自旋锁就是在一个变量上不停轮询,直到条件满足才继续往下执行:
typedef struct {
int val = 0;
} spinlock_t;
void lock(spinlock_t* lck) {
while (test_and_set(&lck->val) == 1) {}
}
void unlock(spinlock_t* lck) {
lck->val = 0;
}
其中 test_and_set() 是一个原子操作(参考资料 3),将参数的值设为 1 并返回修改前的值。因此加锁函数 lock() 做的事情就是一直尝试将 lck->val 的值设置为 1,如果修改前 lck->val 的值已经是 1(说明有其它任务持有这个锁)则继续循环,直到 lck->val 的值为 0,表示由当前任务抢到锁,然后就可以进入临界区了。
释放锁的函数 unlock() 将 lck->val 的值改为 0,这样其中一个在争抢锁的任务将会抢到锁。
自旋锁相对我们常说的互斥锁(mutex)的好处是不用让出当前 cpu,避免了线程被调度走的问题,对于临界区处理时间比较短的情况比较友好。但是存在两个比较明显的问题,一个是 lock() 每次检查都在同一个变量上执行原子操作,对于保持缓存一致性需要的开销很大;二是不确定哪个任务能够抢到,可能会导致某些任务一直抢不到锁而饿死。
ticket spinlock
为了避免以上问题,参考资料 1 提出了 ticket lock,即需要锁的时候先申请一张票然后排队,直到轮到自己才能进入临界区:
typedef struct {
int now_serving = 0;
int next_ticket = 0;
} ticket_spinlock_t;
void lock(ticket_spinlock_t* lck) {
int my_ticket = fetch_and_inc(&lck->next_ticket);
while (my_ticket != lck->now_serving) {}
}
void unlock(ticket_spinlock_t* lck) {
inc(&lck->now_serving);
}
其中 fetch_and_inc() 和 inc() 都是原子操作,前者表示先获取旧的值并递增变量,后者表示直接递增。
从上面的实现可以看出,lock() 函数只调用了一次原子操作,相较前一版的实现中每次判断都需要原子操作,大大减少了消耗;并且每个任务按照到来的顺序取号,保证所有加锁的调用最终都能进入临界区。
加锁函数中判断是否轮到自己的循环中还要轮询共享变量 now_serving,还可以进一步改进。
mcs spinlock
下面是参考资料 1 中提出的改进:
typedef struct mcs_node {
mcs_node* next;
bool is_locked;
} mcsnode_t;
typedef struct {
mcsnode_t* queue = NULL;
} mcs_spinlock_t;
void lock(mcs_spinlock_t* lck, mcsnode_t* node) {
node->next = NULL;
/*
+------+
| node | -> nil
+------+
*/
mcsnode_t* prev = fetch_and_store(&lck->queue, node);
/*
+-------+
| prev? |
+-------+
queue
|
v
+------+
| node | -> nil
+------+
*/
if (prev != NULL) {
/*
+------+
| prev | -> nil
+------+
*/
node->is_locked = true;
prev->next = node;
/*
queue
|
v
+------+ +------+
| prev | -> | node | -> nil
+------+ +------+
*/
while (node->is_locked) {}
}
}
void unlock(mcs_spinlock_t* lck, mcsnode_t* node) {
if (node->next == NULL) {
/*
+------+
| node | -> nil
+------+
*/
if (compare_and_swap(&lck->queue, node, NULL)) {
/*
+------+
| node | -> nil
+------+
queue
|
v
nil
*/
return;
}
/*
+------+
| node | -> nil
+------+
queue
|
v
+-----+
... -> | ... | -> nil
+-----+
*/
while (node->next == NULL) {}
/*
queue
|
v
+------+ +-----+
| node | -> ... -> | ... | -> nil
+------+ +-----+
*/
}
node->next->is_locked = false;
}
为了避免频繁访问同一个变量,mcs spinlock 让每个请求锁的任务提供一个自己的变量 node 来表示当前加锁请求在锁的等候队列中的位置。
在 lock() 函数中,如果队列为空(prev 为空),则不需等待直接返回;如果队列不为空,说明有其它任务持有这个锁,于是需要把自己加到队列中,然后等待前驱任务释放锁(最后的 while 循环)。
在 unlock() 函数中,如果当前任务是队列中的唯一任务(node->next == NULL),则把队列置空(compare_and_swap(...))然后返回即可;如果置空队列失败(queue 已经被其它节点修改),说明有任务正在入队列,则需要等待入队列完成(while 循环),然后把后继节点的 is_locked 设为 false(写入时会使后继节点的 cacheline 失效),下一个任务的 lock() 函数中的 while 循环可以知道锁已被释放,从而进入临界区。
以上的 while 循环都是在轮询本地变量,当锁被前一个任务释放的时候(node->next->is_locked = false 被写入)才会触发缓存同步的操作,大大减少了总线竞争。
参考资料 4 有论文中的一些伪代码。我实现了一个可运行的版本放在 这里。