认识 RCU

understand the theory of RCU

Posted by Max on June 29, 2017

1. 简介

RCU( Read-Copy Update)是内核中以读为主场景下的一种同步机制。

不需要使用锁,可以支持单个 updater 和多个 reader 的并发。对于被RCU保护的共享数据结构, reader 不需要获得任何锁就可以访问它,但 updater 在访问它时首先拷贝一个副本,然后对副本进行修改,最后在适当的时机把指向原来数据的指针指向新的版本。

RCU 一个重要的概念就是将“更新”(update) 操作分为两个阶段——“移除”(removal)和“回收”(reclamation)。“移除”阶段将数据结构内对数据元素的引用删除(或者指向数据元素的新版本),可以与 reader 并行。“回收”阶段将“移除”阶段移除的数据元素回收(或直接释放),需要在读取操作不再持有对这些数据元素的引用后执行,否则会引起混乱。两个阶段之间的间隔也被称之为 “宽限期”(grace period)。

一个典型的更新操作可以概括为如下步骤:

  1. 删除指向数据结构的指针,使得随后的 reader 无法获取到这一数据元素
  2. 等待所有之前的 reader 退出他们RCU相关的临界区
  3. 此时不再有任何 reader 持有对该数据结构的引用,因此可以安全的将其回收。

步骤2是延迟回收操作的关键,使得 reader 可以使用更轻量级的同步方式(在某些不可抢占内核的情境中,读取侧甚至没有任何额外开销)。传统的基于锁的方案, updater 一般会在原位置直接更新数据,与读取操作互斥,需要重量级的同步操作(锁)。相比之下,基于RCU的 updater 充分利用现代CPU对单个对齐的指针的写入是原子的这一特性,对一个链式结构上的数据元素进行原子性的插入、删除、替换操作,保证 reader 得到的是旧版本或新版本的完整数据,而非部分更新的存在 。并发的RCU reader 可以继续访问数据元素的旧版本,而避免使用原子操作(atomic operations)、内存屏障(memory barriers)、通信高速缓存未命中(communications cache misses)这些在对称多处理系统中即使没有锁竞争依然开销巨大的操作。

上述三步操作中,移除和回收两个操作推荐由不同的线程执行,参考内核中 directory-entry cache 的实现方式。

2. 核心API

RCU有很多API,但其实都是以下五个的变种或封装:

a. rcu_read_lock()
b. rcu_read_unlock()
c. synchronize_rcu() / call_rcu()
d. rcu_assign_pointer()
e. rcu_dereference()

2.1 rcu_read_lock()rcu_read_unlock()

reader 调用这两个API来通知 reclaimer 一个 RCU 读取进入或退出了临界区。内核实现include/linux/rcupdate.h如下:

static inline void rcu_read_lock(void)
{
    __rcu_read_lock();
    __acquire(RCU);
    rcu_lock_acquire(&rcu_lock_map);
    RCU_LOCKDEP_WARN(!rcu_is_watching(),
             "rcu_read_lock() used illegally while idle");
}

static inline void rcu_read_unlock(void)
{
    RCU_LOCKDEP_WARN(!rcu_is_watching(),
             "rcu_read_unlock() used illegally while idle");
    __release(RCU);
    __rcu_read_unlock();
    rcu_lock_release(&rcu_lock_map); /* Keep acq info for rls diags. */
}

跳过 __acquire()rcu_lock_acquire()__release()rcu_lock_release() 这些些选择编译函数,这两个 API 的关键在于禁止和启用抢占。虽然打开 CONFIG_PREEMPT_RCU 的内核允许抢占,但这是非法的。任何被 RCU 保护的共享数据结构在其关键读取步骤执行期间都不应该被回收。在读取侧临界区,不允许发生上下文切换。

#ifdef CONFIG_PREEMPT_RCU

void __rcu_read_lock(void)
{
    current->rcu_read_lock_nesting++;
    barrier();  /* critical section after entry code. */
}
EXPORT_SYMBOL_GPL(__rcu_read_lock);

void __rcu_read_unlock(void)
{
    struct task_struct *t = current;

    if (t->rcu_read_lock_nesting != 1) {
        --t->rcu_read_lock_nesting;
    } else {
        barrier();  /* critical section before exit code. */
        t->rcu_read_lock_nesting = INT_MIN;
        barrier();  /* assign before ->rcu_read_unlock_special load */
        if (unlikely(READ_ONCE(t->rcu_read_unlock_special.s)))
            rcu_read_unlock_special(t);
        barrier();  /* ->rcu_read_unlock_special load before assign */
        t->rcu_read_lock_nesting = 0;
    }
#ifdef CONFIG_PROVE_LOCKING
    {
        int rrln = READ_ONCE(t->rcu_read_lock_nesting);

        WARN_ON_ONCE(rrln < 0 && rrln > INT_MIN / 2);
    }
#endif /* #ifdef CONFIG_PROVE_LOCKING */
}
EXPORT_SYMBOL_GPL(__rcu_read_unlock);

#else /* #ifdef CONFIG_PREEMPT_RCU */

static inline void __rcu_read_lock(void)
{
    if (IS_ENABLED(CONFIG_PREEMPT_COUNT))
        preempt_disable();
}

static inline void __rcu_read_unlock(void)
{
    if (IS_ENABLED(CONFIG_PREEMPT_COUNT))
        preempt_enable();
}

#endif /* #else #ifdef CONFIG_PREEMPT_RCU */

另外,引用计数可以与 RCU 结合起来用来维护长期持有的数据结构。多个读取过程临界区可能嵌套和重叠,使用时需要注意。

2.2 rcu_assign_pointer()

updater 调用rcu_assign_pointer()为使用 RCU 保护的指针分配一个新的值, 并执行给定CPU架构必需的内存屏障指令(memory-barrier instructions ),确保所有并发的 reader 可以取得旧版本内容。内核实现include/linux/rcupdate.h如下:

/**
 * rcu_assign_pointer() - assign to RCU-protected pointer
 * @p: pointer to assign to
 * @v: value to assign (publish)
 */
#define rcu_assign_pointer(p, v)                          \
({                                          \
    uintptr_t _r_a_p__v = (uintptr_t)(v);                      \
                                          \
    if (__builtin_constant_p(v) && (_r_a_p__v) == (uintptr_t)NULL)          \
        WRITE_ONCE((p), (typeof(p))(_r_a_p__v));              \
    else                                      \
        smp_store_release(&p, RCU_INITIALIZER((typeof(p))_r_a_p__v)); \
    _r_a_p__v;                                  \
})

该 API 还用于记录被 RCU 保护的指针 和 给定结构体可被其它 CPU 访问的时间点,并因此而频繁被如 list_add_rcu() 等 RCU 列表处理原语间接调用。

2.3 rcu_dereference()

reader 调用rcu_dereference()来获取一个 RCU 保护的指针,这个指针可以在之后被安全解引用。该 API 还会执行给定 CPU 架构必需的内存屏障指令(memory-barrier instructions)。目前,只有 Alpha 架构需要相关内存屏障,其它 CPUs 会将其编译为空。

/**
 * rcu_dereference() - fetch RCU-protected pointer for dereferencing
 * @p: The pointer to read, prior to dereferencing
 *
 * This is a simple wrapper around rcu_dereference_check().
 */
#define rcu_dereference(p) rcu_dereference_check(p, 0)

/**
 * rcu_dereference_check() - rcu_dereference with debug checking
 * @p: The pointer to read, prior to dereferencing
 * @c: The conditions under which the dereference will take place
 */
#define rcu_dereference_check(p, c)  __rcu_dereference_check((p), (c) || rcu_read_lock_held(), __rcu)

#define __rcu_dereference_check(p, c, space) \
({ \
    /* Dependency order vs. p above. */ \
    typeof(*p) *________p1 = (typeof(*p) *__force)lockless_dereference(p); \
    RCU_LOCKDEP_WARN(!(c), "suspicious rcu_dereference_check() usage"); \
    rcu_dereference_sparse(p, space); \
    ((typeof(*p) __force __kernel *)(________p1)); \
})

#define lockless_dereference(p) \
({ \
    typeof(p) _________p1 = READ_ONCE(p); \
    typeof(*(p)) *___typecheck_p __maybe_unused; \
    smp_read_barrier_depends(); /* Dependency order vs. p above. */ \
    (_________p1); \
})

注意,rcu_dereference() API 并不直接解除指针引用,相反,它保护指针直到后期解除引用为止。

与 rcu_assign_pointer() 相同,rcu_dereference() 可以记录被 RCU 保护的指针,特别是标记为可以在 rcu_dereference() 调用后任何时间点改变的指针。同样因此,该 API 频繁被如 list_for_each_entry_rcu() 等 RCU 列表处理原语间接调用。

2.4 synchronize_rcu()

synchronize_rcu()会阻塞 updater 直到所有前置 RCU reader 退出临界区,然后执行 reclaimer,结束 updater。

#ifdef CONFIG_PREEMPT_RCU

void synchronize_rcu(void)
{
    RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
             lock_is_held(&rcu_lock_map) ||
             lock_is_held(&rcu_sched_lock_map),
             "Illegal synchronize_rcu() in RCU read-side critical section");
    if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
        return;
    if (rcu_gp_is_expedited())
        synchronize_rcu_expedited();
    else
        wait_rcu_gp(call_rcu);
}
EXPORT_SYMBOL_GPL(synchronize_rcu);

#else /* #ifdef CONFIG_PREEMPT_RCU */

static inline void synchronize_rcu(void)
{
    synchronize_sched();
}

#endif /* #else #ifdef CONFIG_PREEMPT_RCU */

void synchronize_sched(void)
{
    RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
             lock_is_held(&rcu_lock_map) ||
             lock_is_held(&rcu_sched_lock_map),
             "Illegal synchronize_sched() in RCU-sched read-side critical section");
    if (rcu_blocking_is_gp())
        return;
    if (rcu_gp_is_expedited())
        synchronize_sched_expedited();
    else
        wait_rcu_gp(call_rcu_sched);
}
EXPORT_SYMBOL_GPL(synchronize_sched);

关键辅助函数 wait_rcu_gp()在内核中的实现include/linux/rcupdate_wait.h如下:

/*
 * Structure allowing asynchronous waiting on RCU.
 */
struct rcu_synchronize {
    struct rcu_head head;
    struct completion completion;
};
void __wait_rcu_gp(bool checktiny, int n, call_rcu_func_t *crcu_array,
           struct rcu_synchronize *rs_array)
{
    int i;

    /* Initialize and register callbacks for each flavor specified. */
    for (i = 0; i < n; i++) {
        if (checktiny &&
            (crcu_array[i] == call_rcu ||
             crcu_array[i] == call_rcu_bh)) {
            might_sleep();
            continue;
        }
        init_rcu_head_on_stack(&rs_array[i].head);
        init_completion(&rs_array[i].completion);
        (crcu_array[i])(&rs_array[i].head, wakeme_after_rcu);
    }

    /* Wait for all callbacks to be invoked. */
    for (i = 0; i < n; i++) {
        if (checktiny &&
            (crcu_array[i] == call_rcu ||
             crcu_array[i] == call_rcu_bh))
            continue;
        wait_for_completion(&rs_array[i].completion);
        destroy_rcu_head_on_stack(&rs_array[i].head);
    }
}
EXPORT_SYMBOL_GPL(__wait_rcu_gp);

#define _wait_rcu_gp(checktiny, ...) \
do {                                    \
    call_rcu_func_t __crcu_array[] = { __VA_ARGS__ };        \
    struct rcu_synchronize __rs_array[ARRAY_SIZE(__crcu_array)];    \
    __wait_rcu_gp(checktiny, ARRAY_SIZE(__crcu_array),        \
            __crcu_array, __rs_array);            \
} while (0)

#define wait_rcu_gp(...) _wait_rcu_gp(false, __VA_ARGS__)

可以看到,它初始化了 rcu_synchronize 类型的本地变量,一直等待所有回调函数完成。

2.4.1 RCU 抢占模式下的回调函数call_rcu()

kernel/rcu/tree_plugin.h

/**
 * call_rcu() - Queue a preemptible-RCU callback for invocation after a grace period.
 * @head: structure to be used for queueing the RCU updates.
 * @func: actual callback function to be invoked after the grace period
 */
void call_rcu(struct rcu_head *head, rcu_callback_t func)
{
    __call_rcu(head, func, rcu_state_p, -1, 0);
}
EXPORT_SYMBOL_GPL(call_rcu);

其核心__call_rcu()的实现kernel/rcu/tree.c如下(cpu=-1 代表当前CPU):

static void
__call_rcu(struct rcu_head *head, rcu_callback_t func,
       struct rcu_state *rsp, int cpu, bool lazy)
{
    unsigned long flags;
    struct rcu_data *rdp;

    /* Misaligned rcu_head! */
    WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));

    if (debug_rcu_head_queue(head)) {
        /* Probable double call_rcu(), so leak the callback. */
        WRITE_ONCE(head->func, rcu_leak_callback);
        WARN_ONCE(1, "__call_rcu(): Leaked duplicate callback\n");
        return;
    }
    head->func = func;
    head->next = NULL;
    local_irq_save(flags);
    rdp = this_cpu_ptr(rsp->rda);

    /* Add the callback to our list. */
    if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
        int offline;

        if (cpu != -1)
            rdp = per_cpu_ptr(rsp->rda, cpu);
        if (likely(rdp->mynode)) {
            /* Post-boot, so this should be for a no-CBs CPU. */
            offline = !__call_rcu_nocb(rdp, head, lazy, flags);
            WARN_ON_ONCE(offline);
            /* Offline CPU, _call_rcu() illegal, leak callback.  */
            local_irq_restore(flags);
            return;
        }
        /*
         * Very early boot, before rcu_init().  Initialize if needed
         * and then drop through to queue the callback.
         */
        BUG_ON(cpu != -1);
        WARN_ON_ONCE(!rcu_is_watching());
        if (!likely(rdp->nxtlist))
            init_default_callback_list(rdp);
    }
    WRITE_ONCE(rdp->qlen, rdp->qlen + 1);
    if (lazy)
        rdp->qlen_lazy++;
    else
        rcu_idle_count_callbacks_posted();
    smp_mb();  /* Count before adding callback for rcu_barrier(). */
    *rdp->nxttail[RCU_NEXT_TAIL] = head;
    rdp->nxttail[RCU_NEXT_TAIL] = &head->next;

    if (__is_kfree_rcu_offset((unsigned long)func))
        trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
                     rdp->qlen_lazy, rdp->qlen);
    else
        trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);

    /* Go handle any RCU core processing required. */
    __call_rcu_core(rsp, rdp, head, flags);
    local_irq_restore(flags);
}

__call_rcu() 将参数传入的回调函数 func 赋值给一个 struct rcu_head,再将这个 head 加在了定义在每个 CPU 上的数据结构体 struct rcu_data 的 nxttail 链表上。synchronize_rcu() 传递的是 wakeme_after_rcu() 。该函数将条件变量置真,然后唤醒了在条件变量上等待的进程:

/**
 * wakeme_after_rcu() - Callback function to awaken a task after grace period
 * @head: Pointer to rcu_head member within rcu_synchronize structure
 */
void wakeme_after_rcu(struct rcu_head *head)
{
    struct rcu_synchronize *rcu;

    rcu = container_of(head, struct rcu_synchronize, head);
    complete(&rcu->completion);
}
EXPORT_SYMBOL_GPL(wakeme_after_rcu);

至此可以得知,每个调用 call_rcu() / synchronize_rcu() 的进程都会将一个 rcu_head 挂到 CPU 对应的 rcu_data 的nxttail 链表上,然后挂起,直到(经过一个 grace period 后)所有前置 reader 退出读取临界区,最后触发这个 rcu_head 上的回调函数来唤醒 updater。整个过程如下图所示:

        synchronize_rcu()                after a grace period
              |                                   |
              V              register             V
          call_rcu()     --------------->  wakeme_after_rcu()
              |                                   |
              V                wakeup             V
     wait_for_completion() <--------------    complete()
              |
              V
         bh operations

在阻塞被禁止或 updater 需要更高性能的情境下 call_rcu() 将会替代 synchronize_rcu() 被直接调用,CPU可以转而进行其它的工作。但是,synchronize_rcu() 自带 nice 属性,系统会以之延迟 grace periods 降低 updater 更新速率,增强面对拒绝服务攻击的系统恢复能力。call_rcu() 需要调用者自己解决这一问题,限制更新速率。

2.4.1 RCU 非抢占模式下的回调函数call_rcu_sched()

kernel/rcu/tree.c

/*
 * Queue an RCU-sched callback for invocation after a grace period.
 */
void call_rcu_sched(struct rcu_head *head, rcu_callback_t func)
{
    __call_rcu(head, func, &rcu_sched_state, -1, 0);
}
EXPORT_SYMBOL_GPL(call_rcu_sched);

其核心__call_rcu()的实现与抢占模式下相同。

2.5 小结

5个核心 API 与 reader、updater、reclaimer 的关系如下:

     rcu_assign_pointer()
                                    +--------+
            +---------------------->| reader |---------+
            |                       +--------+         |
            |                           |              |
            |                           |              | Protect:
            |                           |              | rcu_read_lock()
            |                           |              | rcu_read_unlock()
            |        rcu_dereference()  |              |
       +---------+                      |              |
       | updater |<---------------------+              |
       +---------+                                     V
            |                                    +-----------+
            +----------------------------------->| reclaimer |
                                                 +-----------+
              Defer:
              synchronize_rcu() & call_rcu()

3. 宽限期 grace period

如何理解一个宽限期? CPU 如何得知一个宽限期结束了?

在 2.1 节中我们提到 rcu_read_lock() 会禁用抢占,禁止上下文切换,直到 rcu_read_unlock() 解除。换言之,若所有 CPU 都已经过一次上下文切换,则所有前置 reader 的临界区必定全部退出,一个 grace period 结束。

4. 简单示例

4.1 阻塞方式

定义结构体 foo 及其全局指针 gbl_foo:

struct foo {
    int a;
    char b;
    long c;
};
DEFINE_SPINLOCK(foo_mutex);

struct foo __rcu *gbl_foo;

要修改目前 gbl_foo 指向结构体的域值 a,先新建一个相同的结构体,更新域值 a ,然后将 gbl_foo 指向新结构体,最后等待一个宽限期后释放旧的结构体:

void foo_update_a(int new_a)
{
    struct foo *new_fp;
    struct foo *old_fp;

    new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
    spin_lock(&foo_mutex);
    old_fp = rcu_dereference_protected(gbl_foo, lockdep_is_held(&foo_mutex));
    *new_fp = *old_fp;
    new_fp->a = new_a;
    rcu_assign_pointer(gbl_foo, new_fp);  //确保各个并发 reader 读取的是完整的新的结构
    spin_unlock(&foo_mutex);
    synchronize_rcu();  // 确保 持有旧结构体的引用的 reader 完成
    kfree(old_fp);
}

要获取当前 gbl_foo 指向的结构体的域值 a,需要调用 rcu_read_lock() 和 rcu_read_unlock() 接口保证读取过程中结构体不会被回收,并且调用 rcu_dereference() 接口保证 reader 获取的是结构体的初始化版本:

int foo_get_a(void)
{
    int retval;

    rcu_read_lock();
    struct foo * fp = rcu_dereference(gbl_foo);
    retval = fp->a;
    rcu_read_unlock();
    return retval;
}

概括而言,RCU 核心 API 的调用有以下几步:

  • 调用 rcu_read_lock() / rcu_read_unlock() 以保证 RCU 读取侧的临界区。
  • 在RCU 读取侧的临界区中,调用 rcu_dereference() 来解引用被 RCU 保护的指针。
  • 使用可靠的方案(如锁或信号量)确保并发的 updater 不会相互影响。
  • 调用 rcu_assign_pointer() 来更新一个被 RCU 保护的指针。 (该API 保证了 readers 和 updater 间的并发安全,updaters 间的 rcu_assign_pointer() 调用依然需要在锁或其它机制保证下进行。)
  • 在 remove 一个被 RCU 保护的数据结构之后、reclaim 数据元素之前,调用 synchronize_rcu() 以等待前置 RCU reader 全部退出临界区。

4.2 回调方式

若阻塞被禁止,则需要利用回调函数。同样的结构体代码如下:

struct foo {
        int a;
        char b;
        long c;
        struct rcu_head rcu;
    };

而更新函数写作如下形式:

    void foo_update_a(int new_a)
    {
        struct foo *new_fp;
        struct foo *old_fp;

        new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
        spin_lock(&foo_mutex);
        old_fp = rcu_dereference_protected(gbl_foo, lockdep_is_held(&foo_mutex));
        *new_fp = *old_fp;
        new_fp->a = new_a;
        rcu_assign_pointer(gbl_foo, new_fp);
        spin_unlock(&foo_mutex);
        call_rcu(&old_fp->rcu, foo_reclaim);
    }

其中,foo_reclaim() 函数实现如下:

    void foo_reclaim(struct rcu_head *rp)
    {
        struct foo *fp = container_of(rp, struct foo, rcu);

        foo_cleanup(fp->a);

        kfree(fp);
    }

除了用 call_rcu() 替换了 synchronize_rcu() 之外,核心 API 的调用与上节并无不同。

  • 在 从一个被 RCU 保护的数据结构中 remove 一个数据元素后,调用 call_rcu() 注册一个回调函数,等到前置 RCU 读取侧退出临界区后调用。

Tip:若call_rcu() 的回调函数仅执行 kfree() 操作,可以直接调用 kfree_rcu() 取代 call_rcu() 及自定义回调函数。

    kfree_rcu(old_fp, rcu);

5. 参考

  1. Linux内核文档《whatisRCU.txt》
  2. 伯乐在线 - lvyilong316 《深入理解 RCU 实现》