偶然看到 ticket spinlock 这个词,搜了一下相关的介绍,发现早在 1991 年就出现了(参考资料 1),并且还有根据论文作者命名的改进版 MCS spinlock。找到了一篇由浅入深的介绍(参考资料 2),本文基本是按照这个思路来写的。最后再记录一下 k42 的实现。
自旋锁
顾名思义,自旋锁就是在一个变量上不停轮询,直到条件满足才继续往下执行:
typedef struct {
int val = 0;
} spinlock_t;
void lock(spinlock_t* lck) {
while (test_and_set(&lck->val) == 1) {}
}
void unlock(spinlock_t* lck) {
lck->val = 0;
}
其中 test_and_set() 是一个原子操作(参考资料 3),将参数的值设为 1 并返回修改前的值。因此加锁函数 lock() 做的事情就是一直尝试将 lck->val 的值设置为 1,如果修改前 lck->val 的值已经是 1(说明有其它任务持有这个锁)则继续循环,直到 lck->val 的值为 0,表示由当前任务抢到锁,然后就可以进入临界区了。
释放锁的函数 unlock() 将 lck->val 的值改为 0,这样其中一个在争抢锁的任务将会抢到锁。
自旋锁相对我们常说的互斥锁(mutex)的好处是不用让出当前 cpu,避免了线程被调度走的问题,对于临界区处理时间比较短的情况比较友好。但是存在两个比较明显的问题,一个是 lock() 每次检查都在同一个变量上执行原子操作,对于保持缓存一致性需要的开销很大;二是不确定哪个任务能够抢到,可能会导致某些任务一直抢不到锁而饿死。
ticket spinlock
为了避免以上问题,参考资料 1 提出了 ticket lock,即需要锁的时候先申请一张票然后排队,直到轮到自己才能进入临界区:
typedef struct {
int now_serving = 0;
int next_ticket = 0;
} ticket_spinlock_t;
void lock(ticket_spinlock_t* lck) {
int my_ticket = fetch_and_inc(&lck->next_ticket);
while (my_ticket != lck->now_serving) {}
}
void unlock(ticket_spinlock_t* lck) {
inc(&lck->now_serving);
}
其中 fetch_and_inc() 和 inc() 都是原子操作,前者表示先获取旧的值并递增变量,后者表示直接递增。
从上面的实现可以看出,lock() 函数只调用了一次原子操作,相较前一版的实现中每次判断都需要原子操作,大大减少了消耗;并且每个任务按照到来的顺序取号,保证所有加锁的调用最终都能进入临界区。
加锁函数中判断是否轮到自己的循环中还要轮询共享变量 now_serving,还可以进一步改进。
mcs spinlock
下面是参考资料 1 中提出的改进:
class MCSSpinLock final {
public:
struct alignas(CACHELINE_SIZE) Node final {
std::atomic<Node*> next;
std::atomic<uint32_t> locked;
};
public:
void Lock(Node*);
void Unlock(Node*);
private:
std::atomic<Node*> m_tail = {nullptr};
};
void MCSSpinLock::Lock(Node* node) {
node->next.store(nullptr, memory_order_relaxed);
/*
+------+
| node | -> nil
+------+
*/
Node* prev = m_tail.exchange(node, memory_order_acq_rel);
/*
+-------+
| prev? |
+-------+
m_tail
|
v
+------+
| node | -> nil
+------+
*/
if (prev) {
/*
+------+
| prev | -> nil
+------+
*/
node->locked.store(1, memory_order_relaxed);
prev->next.store(node, memory_order_release);
/*
m_tail
|
v
+------+ +------+
| prev | -> | node | -> nil
+------+ +------+
*/
while (node->locked.load(memory_order_acquire)) {
cpu_relax();
}
}
}
void MCSSpinLock::Unlock(Node* node) {
MCSSpinLock::Node* next = node->next.load(memory_order_relaxed);
if (!next) {
/*
+------+
| node | -> nil
+------+
*/
MCSSpinLock::Node* head = node;
if (m_tail.compare_exchange_strong(head, nullptr, memory_order_acq_rel,
memory_order_relaxed)) {
/*
+------+
| node | -> nil
+------+
m_tail
|
v
nil
*/
return;
}
/*
+------+
| node | -> nil
+------+
m_tail
|
v
+-----+
... -> | ... | -> nil
+-----+
*/
while ((next = node->next.load(memory_order_acquire)) == nullptr) {
cpu_relax();
}
/*
m_tail
|
v
+------+ +-----+
| node | -> ... -> | ... | -> nil
+------+ +-----+
*/
}
next->locked.store(0, memory_order_release);
}
为了避免频繁访问同一个变量,mcs spinlock 让每个请求锁的任务提供一个自己的变量 node 来表示当前加锁请求在锁的等候队列中的位置。
在 lock() 函数中,如果队列为空(prev 为空),则不需等待直接返回;如果队列不为空,说明有其它任务持有这个锁,于是需要把自己加到队列中,然后等待前驱任务释放锁(最后的 while 循环)。
在 unlock() 函数中,如果当前任务是队列中的唯一任务(node->next == NULL),则把队列置空(compare_and_swap(...))然后返回即可;如果置空队列失败(queue 已经被其它节点修改),说明有任务正在入队列,则需要等待入队列完成(while 循环),然后把后继节点的 is_locked 设为 false(写入时会使后继节点的 cacheline 失效),下一个任务的 lock() 函数中的 while 循环可以知道锁已被释放,从而进入临界区。
以上的 while 循环都是在轮询本地变量,当锁被前一个任务释放的时候(node->next->is_locked = false 被写入)才会触发缓存同步的操作,大大减少了总线竞争。
k42 mcs spinlock
参考资料 4 有论文中的一些伪代码,介绍了一个 K42(IBM 的一个操作系统)的变种实现,主要是通过增加一个标记变量 m_stub 来避免从外部传入 Node 变量:
/** K42 MCS variant implementation */
class MCSSpinLock final {
public:
struct alignas(CACHELINE_SIZE) Node final {
std::atomic<Node*> next = {nullptr};
union {
std::atomic<Node*> tail; // used by stub
std::atomic<uint32_t> locked; // used by node
};
};
public:
MCSSpinLock();
void Lock();
void Unlock();
private:
Node m_stub;
};
MCSSpinLock::MCSSpinLock() {
m_stub.tail.store(nullptr, memory_order_relaxed);
}
void MCSSpinLock::Lock() {
Node node;
/*
+------+
| node | -> nil
+------+
*/
Node* prev = m_stub.tail.exchange(&node, memory_order_acq_rel);
/*
+-------+
| prev? |
+-------+
tail
|
v
+------+
| node | -> nil
+------+
*/
if (prev) {
node.locked.store(1, memory_order_relaxed);
prev->next.store(&node, memory_order_release);
/*
tail
|
v
+------+ +------+
| prev | -> | node | -> nil
+------+ +------+
*/
while (node.locked.load(memory_order_acquire)) {
cpu_relax();
}
}
// now we have the lock
Node* next = node.next.load(memory_order_relaxed);
m_stub.next.store(next, memory_order_relaxed);
if (!next) {
/*
tail
|
v
+------+
| node | -> nil
+------+
*/
Node* curr = &node;
if (!m_stub.tail.compare_exchange_strong(
curr, &m_stub, memory_order_acq_rel, memory_order_relaxed)) {
/*
+------+
| node | -> nil
+------+
tail
|
v
+-----+
| ... | -> nil
+-----+
*/
while ((next = node.next.load(memory_order_acquire)) == nullptr) {
cpu_relax();
}
/*
tail
|
v
+------+ +------+ +-----+
| node | -> | next | -> ... | ... | -> nil
+------+ +------+ +-----+
*/
m_stub.next.store(next, memory_order_release);
}
/*
else {
tail
|
v
+--------+
| m_stub | -> nil
+--------+
}
*/
}
}
void MCSSpinLock::Unlock() {
Node* next = m_stub.next.load(memory_order_relaxed);
if (!next) {
Node* stub = &m_stub;
/*
tail?
|
v
+--------+
| m_stub |
+--------+
*/
if (m_stub.tail.compare_exchange_strong(
stub, nullptr, memory_order_acq_rel, memory_order_relaxed)) {
/*
tail
|
v
nil
*/
return;
}
/*
+------+
| next | -> nil
+------+
tail
|
v
+-----+
| ... | -> nil
+-----+
*/
while ((next = m_stub.next.load(memory_order_acquire)) == nullptr) {
cpu_relax();
}
}
/*
+------+ +-----+
| next | -> ... | ... | -> nil
+------+ +-----+
*/
next->locked.store(0, memory_order_release);
}
基本原理和标准的 MCS 实现差不多,区别是在 MCSSpinLock 中加了个 stub 来标记当前队列为空或者有其它节点正在入队列(见注释图)。在 Lock() 函数中,加锁部分和标准实现一样,区别是因为用来记录队列位置的 node 是局部变量,因此需要在离开 Lock() 之前处理好 next 指针并记录在 m_stub.next 中,并且把 m_stub.tail 指向 m_stub,这样在 Unlock() 函数中通过 m_stub.tail 是否指向 m_stub 知道队列为空,还是有正在入队列的操作未完成。
虽然 K42 的实现避免了显式传入变量,和一般的 spinlock 接口保持一致,但是也导致了需要更多的判断和原子操作,以及对于共享变量 m_stub 的频繁访问。本来使用 spinlock 就是路径较短的操作,增加的这些操作成本似乎不划算。
其它
Linus 之前说过最好不要在用户空间使用自旋锁(参考资料 5)。mutex 实现中也有先自旋一会,如果无法获取锁再进入睡眠的优化(pthread 中设置 PTHREAD_MUTEX_ADAPTIVE_NP),大多数情况下会比 spinlock 要合适。由于锁的复杂性,他也不建议自己实现锁。