mcs spinlock

偶然看到 ticket spinlock 这个词,搜了一下相关的介绍,发现早在 1991 年就出现了(参考资料 1),并且还有根据论文作者命名的改进版 MCS spinlock。找到了一篇由浅入深的介绍(参考资料 2),本文基本是按照这个思路来写的。最后再记录一下 k42 的实现。

自旋锁

顾名思义,自旋锁就是在一个变量上不停轮询,直到条件满足才继续往下执行:

typedef struct {
   int val = 0;
} spinlock_t;

void lock(spinlock_t* lck) {
    while (test_and_set(&lck->val) == 1) {}
}

void unlock(spinlock_t* lck) {
    lck->val = 0;
}

其中 test_and_set() 是一个原子操作(参考资料 3),将参数的值设为 1 并返回修改前的值。因此加锁函数 lock() 做的事情就是一直尝试将 lck->val 的值设置为 1,如果修改前 lck->val 的值已经是 1(说明有其它任务持有这个锁)则继续循环,直到 lck->val 的值为 0,表示由当前任务抢到锁,然后就可以进入临界区了。

释放锁的函数 unlock()lck->val 的值改为 0,这样其中一个在争抢锁的任务将会抢到锁。

自旋锁相对我们常说的互斥锁(mutex)的好处是不用让出当前 cpu,避免了线程被调度走的问题,对于临界区处理时间比较短的情况比较友好。但是存在两个比较明显的问题,一个是 lock() 每次检查都在同一个变量上执行原子操作,对于保持缓存一致性需要的开销很大;二是不确定哪个任务能够抢到,可能会导致某些任务一直抢不到锁而饿死。

ticket spinlock

为了避免以上问题,参考资料 1 提出了 ticket lock,即需要锁的时候先申请一张票然后排队,直到轮到自己才能进入临界区:

typedef struct {
    int now_serving = 0;
    int next_ticket = 0;
} ticket_spinlock_t;

void lock(ticket_spinlock_t* lck) {
    int my_ticket = fetch_and_inc(&lck->next_ticket);
    while (my_ticket != lck->now_serving) {}
}

void unlock(ticket_spinlock_t* lck) {
    inc(&lck->now_serving);
}

其中 fetch_and_inc()inc() 都是原子操作,前者表示先获取旧的值并递增变量,后者表示直接递增。

从上面的实现可以看出,lock() 函数只调用了一次原子操作,相较前一版的实现中每次判断都需要原子操作,大大减少了消耗;并且每个任务按照到来的顺序取号,保证所有加锁的调用最终都能进入临界区。

加锁函数中判断是否轮到自己的循环中还要轮询共享变量 now_serving,还可以进一步改进。

mcs spinlock

下面是参考资料 1 中提出的改进:

class MCSSpinLock final {
public:
    struct alignas(CACHELINE_SIZE) Node final {
        std::atomic<Node*> next;
        std::atomic<uint32_t> locked;
    };

public:
    void Lock(Node*);
    void Unlock(Node*);

private:
    std::atomic<Node*> m_tail = {nullptr};
};

void MCSSpinLock::Lock(Node* node) {
    node->next.store(nullptr, memory_order_relaxed);

    /*
       +------+
       | node | -> nil
       +------+
    */

    Node* prev = m_tail.exchange(node, memory_order_acq_rel);

    /*
       +-------+
       | prev? |
       +-------+

       m_tail
          |
          v
       +------+
       | node | -> nil
       +------+
    */

    if (prev) {
        /*
          +------+
          | prev | -> nil
          +------+
        */

        node->locked.store(1, memory_order_relaxed);
        prev->next.store(node, memory_order_release);

        /*
                      m_tail
                         |
                         v
          +------+    +------+
          | prev | -> | node | -> nil
          +------+    +------+
        */

        while (node->locked.load(memory_order_acquire)) {
            cpu_relax();
        }
    }
}

void MCSSpinLock::Unlock(Node* node) {
    MCSSpinLock::Node* next = node->next.load(memory_order_relaxed);
    if (!next) {
        /*
           +------+
           | node | -> nil
           +------+
        */

        MCSSpinLock::Node* head = node;
        if (m_tail.compare_exchange_strong(head, nullptr, memory_order_acq_rel,
                                           memory_order_relaxed)) {
            /*
              +------+
              | node | -> nil
              +------+

              m_tail
                 |
                 v
                nil
            */
            return;
        }

        /*
          +------+
          | node | -> nil
          +------+

                 m_tail
                    |
                    v
                 +-----+
          ... -> | ... | -> nil
                 +-----+
        */

        while ((next = node->next.load(memory_order_acquire)) == nullptr) {
            cpu_relax();
        }

        /*
                              m_tail
                                 |
                                 v
           +------+           +-----+
           | node | -> ... -> | ... | -> nil
           +------+           +-----+
        */
    }

    next->locked.store(0, memory_order_release);
}

为了避免频繁访问同一个变量,mcs spinlock 让每个请求锁的任务提供一个自己的变量 node 来表示当前加锁请求在锁的等候队列中的位置。

lock() 函数中,如果队列为空(prev 为空),则不需等待直接返回;如果队列不为空,说明有其它任务持有这个锁,于是需要把自己加到队列中,然后等待前驱任务释放锁(最后的 while 循环)。

unlock() 函数中,如果当前任务是队列中的唯一任务(node->next == NULL),则把队列置空(compare_and_swap(...))然后返回即可;如果置空队列失败(queue 已经被其它节点修改),说明有任务正在入队列,则需要等待入队列完成(while 循环),然后把后继节点的 is_locked 设为 false(写入时会使后继节点的 cacheline 失效),下一个任务的 lock() 函数中的 while 循环可以知道锁已被释放,从而进入临界区。

以上的 while 循环都是在轮询本地变量,当锁被前一个任务释放的时候(node->next->is_locked = false 被写入)才会触发缓存同步的操作,大大减少了总线竞争。

k42 mcs spinlock

参考资料 4 有论文中的一些伪代码,介绍了一个 K42(IBM 的一个操作系统)的变种实现,主要是通过增加一个标记变量 m_stub 来避免从外部传入 Node 变量:

/** K42 MCS variant implementation */

class MCSSpinLock final {
public:
    struct alignas(CACHELINE_SIZE) Node final {
        std::atomic<Node*> next = {nullptr};
        union {
            std::atomic<Node*> tail; // used by stub
            std::atomic<uint32_t> locked; // used by node
        };
    };

public:
    MCSSpinLock();
    void Lock();
    void Unlock();

private:
    Node m_stub;
};

MCSSpinLock::MCSSpinLock() {
    m_stub.tail.store(nullptr, memory_order_relaxed);
}

void MCSSpinLock::Lock() {
    Node node;

    /*
      +------+
      | node | -> nil
      +------+
    */

    Node* prev = m_stub.tail.exchange(&node, memory_order_acq_rel);

    /*
      +-------+
      | prev? |
      +-------+

       tail
         |
         v
      +------+
      | node | -> nil
      +------+
    */

    if (prev) {
        node.locked.store(1, memory_order_relaxed);
        prev->next.store(&node, memory_order_release);

        /*
                       tail
                         |
                         v
          +------+    +------+
          | prev | -> | node | -> nil
          +------+    +------+
        */

        while (node.locked.load(memory_order_acquire)) {
            cpu_relax();
        }
    }

    // now we have the lock

    Node* next = node.next.load(memory_order_relaxed);
    m_stub.next.store(next, memory_order_relaxed);
    if (!next) {
        /*
           tail
             |
             v
          +------+
          | node | -> nil
          +------+
        */

        Node* curr = &node;
        if (!m_stub.tail.compare_exchange_strong(
                curr, &m_stub, memory_order_acq_rel, memory_order_relaxed)) {
            /*
              +------+
              | node | -> nil
              +------+

               tail
                 |
                 v
              +-----+
              | ... | -> nil
              +-----+
            */

            while ((next = node.next.load(memory_order_acquire)) == nullptr) {
                cpu_relax();
            }

            /*
                                           tail
                                             |
                                             v
              +------+    +------+        +-----+
              | node | -> | next | -> ... | ... | -> nil
              +------+    +------+        +-----+
            */

            m_stub.next.store(next, memory_order_release);
        }
        /*
          else {
                tail
                  |
                  v
              +--------+
              | m_stub | -> nil
              +--------+
          }
        */
    }
}

void MCSSpinLock::Unlock() {
    Node* next = m_stub.next.load(memory_order_relaxed);
    if (!next) {
        Node* stub = &m_stub;

        /*
            tail?
              |
              v
          +--------+
          | m_stub |
          +--------+
        */
        if (m_stub.tail.compare_exchange_strong(
                stub, nullptr, memory_order_acq_rel, memory_order_relaxed)) {
            /*
              tail
                |
                v
               nil
            */
            return;
        }

        /*
          +------+
          | next | -> nil
          +------+

           tail
             |
             v
          +-----+
          | ... | -> nil
          +-----+
        */

        while ((next = m_stub.next.load(memory_order_acquire)) == nullptr) {
            cpu_relax();
        }
    }

    /*
      +------+        +-----+
      | next | -> ... | ... | -> nil
      +------+        +-----+
    */

    next->locked.store(0, memory_order_release);
}

基本原理和标准的 MCS 实现差不多,区别是在 MCSSpinLock 中加了个 stub 来标记当前队列为空或者有其它节点正在入队列(见注释图)。在 Lock() 函数中,加锁部分和标准实现一样,区别是因为用来记录队列位置的 node 是局部变量,因此需要在离开 Lock() 之前处理好 next 指针并记录在 m_stub.next 中,并且把 m_stub.tail 指向 m_stub,这样在 Unlock() 函数中通过 m_stub.tail 是否指向 m_stub 知道队列为空,还是有正在入队列的操作未完成。

虽然 K42 的实现避免了显式传入变量,和一般的 spinlock 接口保持一致,但是也导致了需要更多的判断和原子操作,以及对于共享变量 m_stub 的频繁访问。本来使用 spinlock 就是路径较短的操作,增加的这些操作成本似乎不划算。

其它

Linus 之前说过最好不要在用户空间使用自旋锁(参考资料 5)。mutex 实现中也有先自旋一会,如果无法获取锁再进入睡眠的优化(pthread 中设置 PTHREAD_MUTEX_ADAPTIVE_NP),大多数情况下会比 spinlock 要合适。由于锁的复杂性,他也不建议自己实现锁。

参考资料

  1. Algorithms for Scalable Synchronization on Shared-Memory Multiprocessors
  2. How does an MCS lock work?
  3. Test-and-set
  4. 参考资料 1 中的伪代码
  5. No nuances, just buggy code (was: related to Spinlock implementation and the Linux Scheduler)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注