1. 简介
RCU( Read-Copy Update)是内核中以读为主场景下的一种同步机制。
不需要使用锁,可以支持单个 updater 和多个 reader 的并发。对于被RCU保护的共享数据结构, reader 不需要获得任何锁就可以访问它,但 updater 在访问它时首先拷贝一个副本,然后对副本进行修改,最后在适当的时机把指向原来数据的指针指向新的版本。
RCU 一个重要的概念就是将“更新”(update) 操作分为两个阶段——“移除”(removal)和“回收”(reclamation)。“移除”阶段将数据结构内对数据元素的引用删除(或者指向数据元素的新版本),可以与 reader 并行。“回收”阶段将“移除”阶段移除的数据元素回收(或直接释放),需要在读取操作不再持有对这些数据元素的引用后执行,否则会引起混乱。两个阶段之间的间隔也被称之为 “宽限期”(grace period)。
一个典型的更新操作可以概括为如下步骤:
- 删除指向数据结构的指针,使得随后的 reader 无法获取到这一数据元素
- 等待所有之前的 reader 退出他们RCU相关的临界区
- 此时不再有任何 reader 持有对该数据结构的引用,因此可以安全的将其回收。
步骤2是延迟回收操作的关键,使得 reader 可以使用更轻量级的同步方式(在某些不可抢占内核的情境中,读取侧甚至没有任何额外开销)。传统的基于锁的方案, updater 一般会在原位置直接更新数据,与读取操作互斥,需要重量级的同步操作(锁)。相比之下,基于RCU的 updater 充分利用现代CPU对单个对齐的指针的写入是原子的这一特性,对一个链式结构上的数据元素进行原子性的插入、删除、替换操作,保证 reader 得到的是旧版本或新版本的完整数据,而非部分更新的存在 。并发的RCU reader 可以继续访问数据元素的旧版本,而避免使用原子操作(atomic operations)、内存屏障(memory barriers)、通信高速缓存未命中(communications cache misses)这些在对称多处理系统中即使没有锁竞争依然开销巨大的操作。
上述三步操作中,移除和回收两个操作推荐由不同的线程执行,参考内核中 directory-entry cache 的实现方式。
2. 核心API
RCU有很多API,但其实都是以下五个的变种或封装:
a. rcu_read_lock()
b. rcu_read_unlock()
c. synchronize_rcu() / call_rcu()
d. rcu_assign_pointer()
e. rcu_dereference()
2.1 rcu_read_lock()
与 rcu_read_unlock()
reader 调用这两个API来通知 reclaimer 一个 RCU 读取进入或退出了临界区。内核实现include/linux/rcupdate.h如下:
static inline void rcu_read_lock(void)
{
__rcu_read_lock();
__acquire(RCU);
rcu_lock_acquire(&rcu_lock_map);
RCU_LOCKDEP_WARN(!rcu_is_watching(),
"rcu_read_lock() used illegally while idle");
}
static inline void rcu_read_unlock(void)
{
RCU_LOCKDEP_WARN(!rcu_is_watching(),
"rcu_read_unlock() used illegally while idle");
__release(RCU);
__rcu_read_unlock();
rcu_lock_release(&rcu_lock_map); /* Keep acq info for rls diags. */
}
跳过 __acquire()
、 rcu_lock_acquire()
、__release()
、rcu_lock_release()
这些些选择编译函数,这两个 API 的关键在于禁止和启用抢占。虽然打开 CONFIG_PREEMPT_RCU 的内核允许抢占,但这是非法的。任何被 RCU 保护的共享数据结构在其关键读取步骤执行期间都不应该被回收。在读取侧临界区,不允许发生上下文切换。
#ifdef CONFIG_PREEMPT_RCU
void __rcu_read_lock(void)
{
current->rcu_read_lock_nesting++;
barrier(); /* critical section after entry code. */
}
EXPORT_SYMBOL_GPL(__rcu_read_lock);
void __rcu_read_unlock(void)
{
struct task_struct *t = current;
if (t->rcu_read_lock_nesting != 1) {
--t->rcu_read_lock_nesting;
} else {
barrier(); /* critical section before exit code. */
t->rcu_read_lock_nesting = INT_MIN;
barrier(); /* assign before ->rcu_read_unlock_special load */
if (unlikely(READ_ONCE(t->rcu_read_unlock_special.s)))
rcu_read_unlock_special(t);
barrier(); /* ->rcu_read_unlock_special load before assign */
t->rcu_read_lock_nesting = 0;
}
#ifdef CONFIG_PROVE_LOCKING
{
int rrln = READ_ONCE(t->rcu_read_lock_nesting);
WARN_ON_ONCE(rrln < 0 && rrln > INT_MIN / 2);
}
#endif /* #ifdef CONFIG_PROVE_LOCKING */
}
EXPORT_SYMBOL_GPL(__rcu_read_unlock);
#else /* #ifdef CONFIG_PREEMPT_RCU */
static inline void __rcu_read_lock(void)
{
if (IS_ENABLED(CONFIG_PREEMPT_COUNT))
preempt_disable();
}
static inline void __rcu_read_unlock(void)
{
if (IS_ENABLED(CONFIG_PREEMPT_COUNT))
preempt_enable();
}
#endif /* #else #ifdef CONFIG_PREEMPT_RCU */
另外,引用计数可以与 RCU 结合起来用来维护长期持有的数据结构。多个读取过程临界区可能嵌套和重叠,使用时需要注意。
2.2 rcu_assign_pointer()
updater 调用rcu_assign_pointer()
为使用 RCU 保护的指针分配一个新的值, 并执行给定CPU架构必需的内存屏障指令(memory-barrier instructions ),确保所有并发的 reader 可以取得旧版本内容。内核实现include/linux/rcupdate.h如下:
/**
* rcu_assign_pointer() - assign to RCU-protected pointer
* @p: pointer to assign to
* @v: value to assign (publish)
*/
#define rcu_assign_pointer(p, v) \
({ \
uintptr_t _r_a_p__v = (uintptr_t)(v); \
\
if (__builtin_constant_p(v) && (_r_a_p__v) == (uintptr_t)NULL) \
WRITE_ONCE((p), (typeof(p))(_r_a_p__v)); \
else \
smp_store_release(&p, RCU_INITIALIZER((typeof(p))_r_a_p__v)); \
_r_a_p__v; \
})
该 API 还用于记录被 RCU 保护的指针 和 给定结构体可被其它 CPU 访问的时间点,并因此而频繁被如 list_add_rcu() 等 RCU 列表处理原语间接调用。
2.3 rcu_dereference()
reader 调用rcu_dereference()
来获取一个 RCU 保护的指针,这个指针可以在之后被安全解引用。该 API 还会执行给定 CPU 架构必需的内存屏障指令(memory-barrier instructions)。目前,只有 Alpha 架构需要相关内存屏障,其它 CPUs 会将其编译为空。
/**
* rcu_dereference() - fetch RCU-protected pointer for dereferencing
* @p: The pointer to read, prior to dereferencing
*
* This is a simple wrapper around rcu_dereference_check().
*/
#define rcu_dereference(p) rcu_dereference_check(p, 0)
/**
* rcu_dereference_check() - rcu_dereference with debug checking
* @p: The pointer to read, prior to dereferencing
* @c: The conditions under which the dereference will take place
*/
#define rcu_dereference_check(p, c) __rcu_dereference_check((p), (c) || rcu_read_lock_held(), __rcu)
#define __rcu_dereference_check(p, c, space) \
({ \
/* Dependency order vs. p above. */ \
typeof(*p) *________p1 = (typeof(*p) *__force)lockless_dereference(p); \
RCU_LOCKDEP_WARN(!(c), "suspicious rcu_dereference_check() usage"); \
rcu_dereference_sparse(p, space); \
((typeof(*p) __force __kernel *)(________p1)); \
})
#define lockless_dereference(p) \
({ \
typeof(p) _________p1 = READ_ONCE(p); \
typeof(*(p)) *___typecheck_p __maybe_unused; \
smp_read_barrier_depends(); /* Dependency order vs. p above. */ \
(_________p1); \
})
注意,rcu_dereference() API 并不直接解除指针引用,相反,它保护指针直到后期解除引用为止。
与 rcu_assign_pointer() 相同,rcu_dereference() 可以记录被 RCU 保护的指针,特别是标记为可以在 rcu_dereference() 调用后任何时间点改变的指针。同样因此,该 API 频繁被如 list_for_each_entry_rcu() 等 RCU 列表处理原语间接调用。
2.4 synchronize_rcu()
synchronize_rcu()
会阻塞 updater 直到所有前置 RCU reader 退出临界区,然后执行 reclaimer,结束 updater。
#ifdef CONFIG_PREEMPT_RCU
void synchronize_rcu(void)
{
RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
lock_is_held(&rcu_lock_map) ||
lock_is_held(&rcu_sched_lock_map),
"Illegal synchronize_rcu() in RCU read-side critical section");
if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
return;
if (rcu_gp_is_expedited())
synchronize_rcu_expedited();
else
wait_rcu_gp(call_rcu);
}
EXPORT_SYMBOL_GPL(synchronize_rcu);
#else /* #ifdef CONFIG_PREEMPT_RCU */
static inline void synchronize_rcu(void)
{
synchronize_sched();
}
#endif /* #else #ifdef CONFIG_PREEMPT_RCU */
void synchronize_sched(void)
{
RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
lock_is_held(&rcu_lock_map) ||
lock_is_held(&rcu_sched_lock_map),
"Illegal synchronize_sched() in RCU-sched read-side critical section");
if (rcu_blocking_is_gp())
return;
if (rcu_gp_is_expedited())
synchronize_sched_expedited();
else
wait_rcu_gp(call_rcu_sched);
}
EXPORT_SYMBOL_GPL(synchronize_sched);
关键辅助函数 wait_rcu_gp()
在内核中的实现include/linux/rcupdate_wait.h如下:
/*
* Structure allowing asynchronous waiting on RCU.
*/
struct rcu_synchronize {
struct rcu_head head;
struct completion completion;
};
void __wait_rcu_gp(bool checktiny, int n, call_rcu_func_t *crcu_array,
struct rcu_synchronize *rs_array)
{
int i;
/* Initialize and register callbacks for each flavor specified. */
for (i = 0; i < n; i++) {
if (checktiny &&
(crcu_array[i] == call_rcu ||
crcu_array[i] == call_rcu_bh)) {
might_sleep();
continue;
}
init_rcu_head_on_stack(&rs_array[i].head);
init_completion(&rs_array[i].completion);
(crcu_array[i])(&rs_array[i].head, wakeme_after_rcu);
}
/* Wait for all callbacks to be invoked. */
for (i = 0; i < n; i++) {
if (checktiny &&
(crcu_array[i] == call_rcu ||
crcu_array[i] == call_rcu_bh))
continue;
wait_for_completion(&rs_array[i].completion);
destroy_rcu_head_on_stack(&rs_array[i].head);
}
}
EXPORT_SYMBOL_GPL(__wait_rcu_gp);
#define _wait_rcu_gp(checktiny, ...) \
do { \
call_rcu_func_t __crcu_array[] = { __VA_ARGS__ }; \
struct rcu_synchronize __rs_array[ARRAY_SIZE(__crcu_array)]; \
__wait_rcu_gp(checktiny, ARRAY_SIZE(__crcu_array), \
__crcu_array, __rs_array); \
} while (0)
#define wait_rcu_gp(...) _wait_rcu_gp(false, __VA_ARGS__)
可以看到,它初始化了 rcu_synchronize 类型的本地变量,一直等待所有回调函数完成。
2.4.1 RCU 抢占模式下的回调函数call_rcu()
/**
* call_rcu() - Queue a preemptible-RCU callback for invocation after a grace period.
* @head: structure to be used for queueing the RCU updates.
* @func: actual callback function to be invoked after the grace period
*/
void call_rcu(struct rcu_head *head, rcu_callback_t func)
{
__call_rcu(head, func, rcu_state_p, -1, 0);
}
EXPORT_SYMBOL_GPL(call_rcu);
其核心__call_rcu()
的实现kernel/rcu/tree.c如下(cpu=-1 代表当前CPU):
static void
__call_rcu(struct rcu_head *head, rcu_callback_t func,
struct rcu_state *rsp, int cpu, bool lazy)
{
unsigned long flags;
struct rcu_data *rdp;
/* Misaligned rcu_head! */
WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
if (debug_rcu_head_queue(head)) {
/* Probable double call_rcu(), so leak the callback. */
WRITE_ONCE(head->func, rcu_leak_callback);
WARN_ONCE(1, "__call_rcu(): Leaked duplicate callback\n");
return;
}
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = this_cpu_ptr(rsp->rda);
/* Add the callback to our list. */
if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
int offline;
if (cpu != -1)
rdp = per_cpu_ptr(rsp->rda, cpu);
if (likely(rdp->mynode)) {
/* Post-boot, so this should be for a no-CBs CPU. */
offline = !__call_rcu_nocb(rdp, head, lazy, flags);
WARN_ON_ONCE(offline);
/* Offline CPU, _call_rcu() illegal, leak callback. */
local_irq_restore(flags);
return;
}
/*
* Very early boot, before rcu_init(). Initialize if needed
* and then drop through to queue the callback.
*/
BUG_ON(cpu != -1);
WARN_ON_ONCE(!rcu_is_watching());
if (!likely(rdp->nxtlist))
init_default_callback_list(rdp);
}
WRITE_ONCE(rdp->qlen, rdp->qlen + 1);
if (lazy)
rdp->qlen_lazy++;
else
rcu_idle_count_callbacks_posted();
smp_mb(); /* Count before adding callback for rcu_barrier(). */
*rdp->nxttail[RCU_NEXT_TAIL] = head;
rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
if (__is_kfree_rcu_offset((unsigned long)func))
trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
rdp->qlen_lazy, rdp->qlen);
else
trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);
/* Go handle any RCU core processing required. */
__call_rcu_core(rsp, rdp, head, flags);
local_irq_restore(flags);
}
__call_rcu() 将参数传入的回调函数 func 赋值给一个 struct rcu_head,再将这个 head 加在了定义在每个 CPU 上的数据结构体 struct rcu_data 的 nxttail 链表上。synchronize_rcu() 传递的是 wakeme_after_rcu() 。该函数将条件变量置真,然后唤醒了在条件变量上等待的进程:
/**
* wakeme_after_rcu() - Callback function to awaken a task after grace period
* @head: Pointer to rcu_head member within rcu_synchronize structure
*/
void wakeme_after_rcu(struct rcu_head *head)
{
struct rcu_synchronize *rcu;
rcu = container_of(head, struct rcu_synchronize, head);
complete(&rcu->completion);
}
EXPORT_SYMBOL_GPL(wakeme_after_rcu);
至此可以得知,每个调用 call_rcu() / synchronize_rcu() 的进程都会将一个 rcu_head 挂到 CPU 对应的 rcu_data 的nxttail 链表上,然后挂起,直到(经过一个 grace period 后)所有前置 reader 退出读取临界区,最后触发这个 rcu_head 上的回调函数来唤醒 updater。整个过程如下图所示:
synchronize_rcu() after a grace period
| |
V register V
call_rcu() ---------------> wakeme_after_rcu()
| |
V wakeup V
wait_for_completion() <-------------- complete()
|
V
bh operations
在阻塞被禁止或 updater 需要更高性能的情境下 call_rcu() 将会替代 synchronize_rcu() 被直接调用,CPU可以转而进行其它的工作。但是,synchronize_rcu() 自带 nice 属性,系统会以之延迟 grace periods 降低 updater 更新速率,增强面对拒绝服务攻击的系统恢复能力。call_rcu() 需要调用者自己解决这一问题,限制更新速率。
2.4.1 RCU 非抢占模式下的回调函数call_rcu_sched()
/*
* Queue an RCU-sched callback for invocation after a grace period.
*/
void call_rcu_sched(struct rcu_head *head, rcu_callback_t func)
{
__call_rcu(head, func, &rcu_sched_state, -1, 0);
}
EXPORT_SYMBOL_GPL(call_rcu_sched);
其核心__call_rcu()
的实现与抢占模式下相同。
2.5 小结
5个核心 API 与 reader、updater、reclaimer 的关系如下:
rcu_assign_pointer()
+--------+
+---------------------->| reader |---------+
| +--------+ |
| | |
| | | Protect:
| | | rcu_read_lock()
| | | rcu_read_unlock()
| rcu_dereference() | |
+---------+ | |
| updater |<---------------------+ |
+---------+ V
| +-----------+
+----------------------------------->| reclaimer |
+-----------+
Defer:
synchronize_rcu() & call_rcu()
3. 宽限期 grace period
如何理解一个宽限期? CPU 如何得知一个宽限期结束了?
在 2.1 节中我们提到 rcu_read_lock() 会禁用抢占,禁止上下文切换,直到 rcu_read_unlock() 解除。换言之,若所有 CPU 都已经过一次上下文切换,则所有前置 reader 的临界区必定全部退出,一个 grace period 结束。
4. 简单示例
4.1 阻塞方式
定义结构体 foo 及其全局指针 gbl_foo:
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo __rcu *gbl_foo;
要修改目前 gbl_foo 指向结构体的域值 a,先新建一个相同的结构体,更新域值 a ,然后将 gbl_foo 指向新结构体,最后等待一个宽限期后释放旧的结构体:
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
spin_lock(&foo_mutex);
old_fp = rcu_dereference_protected(gbl_foo, lockdep_is_held(&foo_mutex));
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp); //确保各个并发 reader 读取的是完整的新的结构
spin_unlock(&foo_mutex);
synchronize_rcu(); // 确保 持有旧结构体的引用的 reader 完成
kfree(old_fp);
}
要获取当前 gbl_foo 指向的结构体的域值 a,需要调用 rcu_read_lock() 和 rcu_read_unlock() 接口保证读取过程中结构体不会被回收,并且调用 rcu_dereference() 接口保证 reader 获取的是结构体的初始化版本:
int foo_get_a(void)
{
int retval;
rcu_read_lock();
struct foo * fp = rcu_dereference(gbl_foo);
retval = fp->a;
rcu_read_unlock();
return retval;
}
概括而言,RCU 核心 API 的调用有以下几步:
- 调用 rcu_read_lock() / rcu_read_unlock() 以保证 RCU 读取侧的临界区。
- 在RCU 读取侧的临界区中,调用 rcu_dereference() 来解引用被 RCU 保护的指针。
- 使用可靠的方案(如锁或信号量)确保并发的 updater 不会相互影响。
- 调用 rcu_assign_pointer() 来更新一个被 RCU 保护的指针。 (该API 保证了 readers 和 updater 间的并发安全,updaters 间的 rcu_assign_pointer() 调用依然需要在锁或其它机制保证下进行。)
- 在 remove 一个被 RCU 保护的数据结构之后、reclaim 数据元素之前,调用 synchronize_rcu() 以等待前置 RCU reader 全部退出临界区。
4.2 回调方式
若阻塞被禁止,则需要利用回调函数。同样的结构体代码如下:
struct foo {
int a;
char b;
long c;
struct rcu_head rcu;
};
而更新函数写作如下形式:
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
spin_lock(&foo_mutex);
old_fp = rcu_dereference_protected(gbl_foo, lockdep_is_held(&foo_mutex));
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
call_rcu(&old_fp->rcu, foo_reclaim);
}
其中,foo_reclaim() 函数实现如下:
void foo_reclaim(struct rcu_head *rp)
{
struct foo *fp = container_of(rp, struct foo, rcu);
foo_cleanup(fp->a);
kfree(fp);
}
除了用 call_rcu() 替换了 synchronize_rcu() 之外,核心 API 的调用与上节并无不同。
- 在 从一个被 RCU 保护的数据结构中 remove 一个数据元素后,调用 call_rcu() 注册一个回调函数,等到前置 RCU 读取侧退出临界区后调用。
Tip:若call_rcu() 的回调函数仅执行 kfree() 操作,可以直接调用 kfree_rcu() 取代 call_rcu() 及自定义回调函数。
kfree_rcu(old_fp, rcu);