xv6内核扩展7:锁

发布于 作者: Ethan

内存分配器

程序用户/kalloctest 对 xv6 的内存分配器进行压力测试:三个进程增长和缩小它们的地址空间,导致大量调用 kalloc 和 kfree 。 kalloc 和 kfree 获取 kmem.lock 。kalloctest 打印(作为"#test-and-set")由于尝试获取另一个核心已经持有的锁而在 acquire 中的循环迭代次数,针对 kmem 锁和其他几个锁。 acquire 中的循环迭代次数是锁竞争的粗略衡量。在开始实验之前, kalloctest 的输出看起来类似于以下内容:

$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #test-and-set 18820 #acquire() 433058
lock: bcache: #test-and-set 0 #acquire() 1744
--- top 5 contended locks:
lock: uart: #test-and-set 97375 #acquire() 117
lock: virtio_disk: #test-and-set 96764 #acquire() 183
lock: proc: #test-and-set 45428 #acquire() 522884
lock: proc: #test-and-set 25506 #acquire() 522875
lock: kmem: #test-and-set 18820 #acquire() 433058
tot= 18820
test1 FAIL
start test2
total free number of pages: 32463 (out of 32768)
..........
test2 OK
start test3
..........child done 10000

test3 OK
start test4
..........child done 100000
--- lock kmem/bcache stats
lock: kmem: #test-and-set 135309 #acquire() 3006254
lock: bcache: #test-and-set 0 #acquire() 1860
--- top 5 contended locks:
lock: wait_lock: #test-and-set 37427486 #acquire() 40008
lock: proc: #test-and-set 3108381 #acquire() 1689532
lock: proc: #test-and-set 207812 #acquire() 1588761
lock: kmem: #test-and-set 135309 #acquire() 3006254
lock: uart: #test-and-set 97375 #acquire() 1303
tot= 135309
test4 FAIL m 59382 n 135309
$ 

你可能会看到与这里不同的计数,并且前五个竞争锁的顺序也不同。

acquire 为每个锁维护调用 acquire 的次数计数,以及 acquire 中尝试但未能设置锁的循环次数。kalloctest 调用一个系统调用,使内核打印 kmem 锁(本实验的重点)和 5 个竞争最激烈的锁的这些计数。如果存在锁竞争, acquire 循环的迭代次数会很大。该系统调用返回 kmem 锁的循环迭代次数之和。

对于本实验,你必须使用一台专用未加载的多核机器。如果你使用一台正在做其他事情的机器,kalloctest 打印的计数将毫无意义。你可以使用一台专用的 Athena 工作站,或你自己的笔记本电脑,但不要使用拨号上网的机器。

kalloctest 中锁竞争的根本原因是 kalloc() 只有一个自由列表,由一个锁保护。为了消除锁竞争,你需要重新设计内存分配器以避免单个锁和列表。基本思路是为每个 CPU 维护一个自由列表,每个列表有自己的锁。不同 CPU 上的分配和释放可以并行运行,因为每个 CPU 将操作不同的列表。主要挑战在于处理某个 CPU 的自由列表为空,而另一个 CPU 的列表有可用内存的情况;在这种情况下,一个 CPU 必须“窃取”另一个 CPU 的部分自由列表。窃取可能会引入锁竞争,但希望这种情况不常见。

你的工作是实现每个 CPU 的空闲列表,并在 CPU 的空闲列表为空时进行窃取。你必须给你的所有锁命名以"kmem"开头。也就是说,你应该为你的每个锁调用 initlock ,并传递一个以"kmem"开头的名称。运行 kalloctest 来检查你的实现是否减少了锁竞争。要检查它是否仍然可以分配所有内存,运行 usertests sbrkmuch 。你的输出将类似于下面显示的内容,尽管具体数字会有所不同,但 kmem 锁的总竞争将大大减少。确保 usertests -q 中的所有测试都通过。 make grade 应该显示 kalloctests 通过。

$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #test-and-set 0 #acquire() 117911
lock: kmem: #test-and-set 0 #acquire() 169464
lock: kmem: #test-and-set 0 #acquire() 145786
lock: bcache: #test-and-set 0 #acquire() 1744
--- top 5 contended locks:
lock: virtio_disk: #test-and-set 93747 #acquire() 183
lock: proc: #test-and-set 44010 #acquire() 526316
lock: proc: #test-and-set 24347 #acquire() 526309
lock: wait_lock: #test-and-set 11726 #acquire() 12
lock: pr: #test-and-set 4579 #acquire() 5
tot= 0
test1 OK
start test2
total free number of pages: 32463 (out of 32768)
..........
test2 OK
start test3
..........child done 10000

test3 OK
start test4
..........child done 100000
--- lock kmem/bcache stats
lock: kmem: #test-and-set 3673 #acquire() 827384
lock: kmem: #test-and-set 3449 #acquire() 1215152
lock: kmem: #test-and-set 1924 #acquire() 1236349
lock: kmem: #test-and-set 0 #acquire() 1014
lock: kmem: #test-and-set 0 #acquire() 1014
lock: kmem: #test-and-set 0 #acquire() 1014
lock: kmem: #test-and-set 0 #acquire() 1014
lock: kmem: #test-and-set 0 #acquire() 1014
lock: bcache: #test-and-set 0 #acquire() 1860
--- top 5 contended locks:
lock: wait_lock: #test-and-set 39121537 #acquire() 40020
lock: proc: #test-and-set 6853704 #acquire() 1672258
lock: proc: #test-and-set 214194 #acquire() 1614201
lock: uart: #test-and-set 195773 #acquire() 1459
lock: virtio_disk: #test-and-set 93747 #acquire() 183
tot= 9046

test4 OK
$ usertests sbrkmuch
usertests starting
test sbrkmuch: OK
ALL TESTS PASSED
$ usertests -q
...
ALL TESTS PASSED
$

一些提示:

  • 你可以使用 kernel/param.h 中的常量 NCPU 。
  • 让 freerange 将所有空闲内存分配给运行 freerange 的 CPU。
  • 函数 cpuid 返回当前核心编号,但只有在关闭中断时调用它和使用其结果才是安全的。你应该使用 push_off() 和 pop_off() 来关闭和开启中断。
  • 参考 kernel/sprintf.c 中的 snprintf 函数来获取字符串格式化的思路。不过,将所有锁都命名为 "kmem" 也是可以接受的。
  • 可选地,使用 xv6 的竞争检测器运行你的解决方案: kalloctest 可能会失败,但你应该看不到任何竞争。如果 xv6 的竞争检测器检测到竞争,它会打印两条堆栈跟踪来描述竞争情况,如下所示:在你的操作系统中,你可以通过将堆栈跟踪复制粘贴到 addr2line 中,将其转换为带行号的函数名。你不必运行竞争检测器,但可能会发现它很有帮助。请注意,竞争检测器会显著降低 xv6 的性能,所以当运行 usertests 时,你可能不希望使用它。

这是一个比较简单的任务,实现方法如下:

  1. 将原先的kmem定义为结构体memblock(其中包含一个spinlockrun),并定义struct memblock kmems[NCPU]
  2. kinit中对kmems中每一个元素initlock
  3. kfree()中先拿到当前cpuid(记得关掉interrupt),然后获取当前cpu所对应的memblock,后面的逻辑与先前一致
  4. kalloc()中:
    • 获取当前cpuid
    • 尝试alloc当前cpu对应的freelist(与先前逻辑一致)
    • 如果失败,则遍历每一个cpu的freelist尝试获取,如果成功则返回(steal过程)

注意:在kalloc中steal前需要释放当前cpu对应的freelist的lock,避免死锁。

读写锁

这半个作业与第一部分独立;无论你是否完成了第一部分,你都可以完成这半个部分(并通过测试)。

考虑 xv6 的 sys_pause 和 sys_uptime 函数,它们读取全局 ticks 变量。由于该变量可能被 clockintr 并发更新,这两个函数在读取 ticks 之前获取 tickslock 自旋锁。重要的是,这防止了 clockintr 并发修改 ticks ,但这也阻止了多个核心同时读取 ticks ,而这种情况本是可以接受的。在后一种情况下,自旋锁不必要地降低了性能。

解决这个问题的一个常见方案是使用读写锁。读写锁引入了两种锁持有者的概念:读者和写者。任何时候最多只有一个写者(并且当有写者时,不能有读者),但可以有多个读者同时存在(只要没有写者)。典型的读写锁 API 如下(见 kernel/defs.h ):

void            initrwlock(struct rwspinlock*);
void            read_acquire(struct rwspinlock*);
void            read_release(struct rwspinlock*);
void            write_acquire(struct rwspinlock*);
void            write_release(struct rwspinlock*);

读-写锁可能存在的一个微妙问题是,如果有许多读者,那么写者可能永远没有机会运行。换句话说,尽管读者不断获取和释放锁,但实际存在零个读者的时刻永远不会到来,因此写者可能永远无法获取锁。为了解决这个问题,读-写锁通常实现一个写者优先级方案:一旦写者尝试获取锁,后续的读者必须等待写者成功获取并释放锁(当然,在允许写者获取锁之前,需要等待当前读者释放锁)。

在 xv6 中实现一个读写自旋锁,遵循上述 API 和语义描述。确保读者不会饿死写者:如果有任何待定的写者,则不能再有读者获取锁。

你需要填写 kernel/spinlock.c 中读写自旋锁 API 的占位函数,并可能需要更改 kernel/spinlock.h 中 struct rwspinlock 的定义。

完成后,通过运行 rwlktest 来测试你的 rwspinlock 实现。你应该看到类似以下的输出:

$ rwlktest
rwspinlock_test: step 1
rwspinlock_test: step 2
rwspinlock_test: step 3
rwspinlock_test: step 4
rwspinlock_test: step 5
rwspinlock_test: step 6
rwspinlock_test: step 7
rwspinlock_test(0): 0
rwspinlock_test(2): 0
rwspinlock_test(1): 0
rwlktest: passed 3/3
$

确保 usertests -q 仍然通过。 make grade 在你完成时应通过所有测试。

一些提示:

  • 先看看 sys_rwlktest 中的 kernel/spinlock.c ;你应该能够分阶段测试你的实现。
  • 如果你在 xv6 中运行 rwlktest 而不修改 kernel/spinlock.c 中的任何内容,内核将打印 panic: acquire ,因为原始的 spinlock 实现一次只允许一个线程持有锁。你需要用你自己的锁实现替换 write/read_acquire_inner 和 write/read_release_inner 中的 acquire() 和 release() 的调用。
  • 要注意可能的交错问题。例如,如果一个读者看到没有待定的写者并想获取锁进行读取,你怎么知道仍然安全获取锁?
  • 了解 gcc 的原子操作内置函数可能会有所帮助;请参阅 https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html。
  1. 首先定义struct rwspinlock
    • struct spinlock ll
    • uint wlocked
    • uint pending_writer
    • uint reader_num
  2. 实现read_acquire_inner
    • 不断检查当前是否有pending writer或者是否被wlocked,如果有则spin
    • 获取ll
    • 再次检查如上内容,如果有则回到开始
    • 否则增加reader_num
    • 释放ll
  3. 实现read_release_inner
    • 减少reader_num
  4. 实现write_acquire_inner
    • 增加pending writer
    • 不断检查当前是否有reader,或者是否被wlocked
    • 如果有,则spin
    • 获取ll
    • 再次检查如上内容,如果有则回到开始
    • wlock,并减少pending writer
    • 释放ll
  5. 实现writer_release_inner
    • 去除wlocked

备注:

所有的读写操作必须使用内置__atomic方法,比如__atomic_store_n__atomic_load_natomic_fetch_sub等,并选择合适的内存排布,如__ATOMIC_RELAEXED/ACQ_REL/RELEASE

结果

== Test   kalloctest: test1 == 
  kalloctest: test1: OK 
== Test   kalloctest: test2 == 
  kalloctest: test2: OK 
== Test   kalloctest: test3 == 
  kalloctest: test3: OK 
== Test   kalloctest: test4 == 
  kalloctest: test4: OK 
== Test kalloctest: sbrkmuch == 
$ make qemu-gdb
kalloctest: sbrkmuch: OK (17.8s) 
== Test running rwlktest == 
$ make qemu-gdb
(55.3s) 
== Test   rwlktest == 
  rwlktest: OK 
== Test usertests == 
$ make qemu-gdb
usertests: OK (340.1s) 
== Test time == 
time: OK
Score: 100/100