mcs spinlock

偶然看到 ticket spinlock 这个词,搜了一下相关的介绍,发现早在 1991 年就出现了(参考资料 1),并且还有根据论文作者命名的改进版 MCS spinlock。还找到了一篇由浅入深的介绍(参考资料 2)。本文基本是按照参考资料 2 的思路来写的。

自旋锁

顾名思义,自旋锁就是在一个变量上不停轮询,直到条件满足才继续往下执行:

typedef struct {
   int val = 0;
} spinlock_t;

void lock(spinlock_t* lck) {
    while (test_and_set(&lck->val) == 1) {}
}

void unlock(spinlock_t* lck) {
    lck->val = 0;
}

其中 test_and_set() 是一个原子操作(参考资料 3),将参数的值设为 1 并返回修改前的值。因此加锁函数 lock() 做的事情就是一直尝试将 lck->val 的值设置为 1,如果修改前 lck->val 的值已经是 1(说明有其它任务持有这个锁)则继续循环,直到 lck->val 的值为 0,表示由当前任务抢到锁,然后就可以进入临界区了。

释放锁的函数 unlock()lck->val 的值改为 0,这样其中一个在争抢锁的任务将会抢到锁。

自旋锁相对我们常说的互斥锁(mutex)的好处是不用让出当前 cpu,避免了线程被调度走的问题,对于临界区处理时间比较短的情况比较友好。但是存在两个比较明显的问题,一个是 lock() 每次检查都在同一个变量上执行原子操作,对于保持缓存一致性需要的开销很大;二是不确定哪个任务能够抢到,可能会导致某些任务一直抢不到锁而饿死。

ticket spinlock

为了避免以上问题,参考资料 1 提出了 ticket lock,即需要锁的时候先申请一张票然后排队,直到轮到自己才能进入临界区:

typedef struct {
    int now_serving = 0;
    int next_ticket = 0;
} ticket_spinlock_t;

void lock(ticket_spinlock_t* lck) {
    int my_ticket = fetch_and_inc(&lck->next_ticket);
    while (my_ticket != lck->now_serving) {}
}

void unlock(ticket_spinlock_t* lck) {
    inc(&lck->now_serving);
}

其中 fetch_and_inc()inc() 都是原子操作,前者表示先获取旧的值并递增变量,后者表示直接递增。

从上面的实现可以看出,lock() 函数只调用了一次原子操作,相较前一版的实现中每次判断都需要原子操作,大大减少了消耗;并且每个任务按照到来的顺序取号,保证所有加锁的调用最终都能进入临界区。

加锁函数中判断是否轮到自己的循环中还要轮询共享变量 now_serving,还可以进一步改进。

mcs spinlock

下面是参考资料 1 中提出的改进:

typedef struct mcs_node {
    mcs_node* next;
    bool is_locked;
} mcsnode_t;

typedef struct {
    mcsnode_t* queue = NULL;
} mcs_spinlock_t;

void lock(mcs_spinlock_t* lck, mcsnode_t* node) {
    node->next = NULL;

    /*
       +------+
       | node | -> nil
       +------+
    */

    mcsnode_t* prev = fetch_and_store(&lck->queue, node);

    /*
       +-------+
       | prev? |
       +-------+

        queue
          |
          v
       +------+
       | node | -> nil
       +------+
    */

    if (prev != NULL) {
        /*
          +------+
          | prev | -> nil
          +------+
        */

        node->is_locked = true;
        prev->next = node;

        /*
                       queue
                         |
                         v
          +------+    +------+
          | prev | -> | node | -> nil
          +------+    +------+
        */

        while (node->is_locked) {}
    }
}

void unlock(mcs_spinlock_t* lck, mcsnode_t* node) {
    if (node->next == NULL) {
        /*
           +------+
           | node | -> nil
           +------+
        */

        if (compare_and_swap(&lck->queue, node, NULL)) {
            /*
              +------+
              | node | -> nil
              +------+

               queue
                 |
                 v
                nil
            */
            return;
        }

        /*
          +------+
          | node | -> nil
          +------+

                  queue
                    |
                    v
                 +-----+
          ... -> | ... | -> nil
                 +-----+
        */

        while (node->next == NULL) {}

        /*
                               queue
                                 |
                                 v
           +------+           +-----+
           | node | -> ... -> | ... | -> nil
           +------+           +-----+
        */
    }

    node->next->is_locked = false;
}

为了避免频繁访问同一个变量,mcs spinlock 让每个请求锁的任务提供一个自己的变量 node 来表示当前加锁请求在锁的等候队列中的位置。

lock() 函数中,如果队列为空(prev 为空),则不需等待直接返回;如果队列不为空,说明有其它任务持有这个锁,于是需要把自己加到队列中,然后等待前驱任务释放锁(最后的 while 循环)。

unlock() 函数中,如果当前任务是队列中的唯一任务(node->next == NULL),则把队列置空(compare_and_swap(...))然后返回即可;如果置空队列失败(queue 已经被其它节点修改),说明有任务正在入队列,则需要等待入队列完成(while 循环),然后把后继节点的 is_locked 设为 false(写入时会使后继节点的 cacheline 失效),下一个任务的 lock() 函数中的 while 循环可以知道锁已被释放,从而进入临界区。

以上的 while 循环都是在轮询本地变量,当锁被前一个任务释放的时候(node->next->is_locked = false 被写入)才会触发缓存同步的操作,大大减少了总线竞争。

参考资料 4 有论文中的一些伪代码。我实现了一个可运行的版本放在 这里

参考资料

  1. Algorithms for Scalable Synchronization on Shared-Memory Multiprocessors
  2. How does an MCS lock work?
  3. Test-and-set
  4. 参考资料 1 中的伪代码

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注