Lab链接:Lab: locks
Lab源码:momo/MIT-6S081/lock - Gitee.com
1 Memory allocator
题目
当前 xv6 调用 kalloc
/kfree
需要先获取 kmem.lock
锁,但是这个锁保护的临界区是整个内存空间,粒度较大
你的任务是缩小 kmem.lock
锁的粒度,具体做法是,每个 CPU 维护一个属于自己的内存页列表,以实现不同CPU独立自行管理内存,实现并行。但若当前CPU列表没有空闲页,则从另一个 CPU 空闲列表“窃取”一页分配,所以还是要锁
思路
原先kmem
结构体在这里表示一个CPU内存页列表 kmem_t
,使用kmem
全局数组表示每个CPU独立维护的内存管理单元
struct kmem_t {
struct spinlock lock;
struct run *freelist;
};
struct kmem_t kmem[NCPU];
在初始化每个CPU的内存分配器时,每个CPU分配内存空间的大小为page_cnt * PGSIZE
由于按照CPU编号从小到大的顺序进行分配,每个CPU获得的内存起始地址都是基于前序CPU已分配内存的累加结果。具体来说,第i个CPU的内存空间起始位置是在初始地址end
的基础上,加上前 i-1
个CPU所分配的全部内存页的总大小
因此,freerange
函数的操作范围在end + page_cnt * PGSIZE
到end + id * INTERVAL + PGSIZE
地址区间
void kinit() {
char lock_name[16];
void *pa;
int i, page_per_cpu, left, page_cnt;
// 计算每个CPU平均分配的页数
page_per_cpu = ((PHYSTOP - (uint64)end) / PGSIZE) / NCPU;
left = ((PHYSTOP - (uint64)end) / PGSIZE) % NCPU;
pa = (void*)end;
for (i = 0; i < NCPU; i++) {
// 初始化每个CPU的锁
snprintf(lock_name, 16, "kmem%d", i);
initlock(&kmem[i].lock, lock_name);
// 分配内存区域(考虑不能整除的情况)
page_cnt = left-- > 0 ? page_per_cpu + 1 : page_per_cpu;
freerange(pa, (void*)(pa + page_cnt * PGSIZE));
pa += page_cnt * PGSIZE;
}
}
释放页的逻辑与原先基本一致,只是获取的是本CPU的锁
分配页时,优先从当前CPU获取空闲页,如果没有空闲页时遍历其他CPU列表,每次操作前需获取对应CPU的锁
void *kalloc(void) {
for(int i = 0; i < NCPU; i++) {
if(i == cpuid()) continue;
acquire(&kmem[i].lock);
struct run *r = kmem[i].freelist;
if(r) {
kmem[i].freelist = r->next;
release(&kmem[i].lock);
return (void*)r;
}
release(&kmem[i].lock);
}
return NULL;
}
bug fix:
在完成初始版本后,出现“panic mappages ”报错,遂使用gdb调试排查原因。
常见的思路是,在哪报错就在报错所在函数的入口处打断点:这里使用
b vm.c:152
在mappages
函数入口处打上断点
c
继续执行,gdb在mappages
处暂停,使用bt full
查看当前调用栈,kinit -> kvminit ->kmappages打印参数,va的地址对应
memlayout.h
中PCIE页所在地址因此推断可能是初始化内存的范围超出了end~PHYSTOP,
kmappages
检测到该页已被映射,从而触发错误即问题根源在于
kinit
的初始化过程,而非原先怀疑的kalloc
或kfree
相关逻辑
2 Buffer cache
题目
当前 xv6 的块缓存(Buffer Cache)使用一个全局锁 bcache.lock
来保护所有块缓存,导致对缓冲区的任意读写操作完全串行执行的,这在高并发场景下会成为性能瓶颈。
本实验的目标是减少锁竞争,通过哈希分桶的方式让不同块的操作可以并行进行。
思路
所有缓存块按blockno
哈希到不同的桶中,这样仅当多个线程操作同一哈希桶时才会发生锁竞争,对不同桶的操作可以并行执行,显著减少锁地争用。
将缓存块按照哈希桶分组管理,bget/brelease
和内存管理器 kalloc/kfree
的操作类似,难点在于 bget
未命中缓存时,寻找合适的缓存块与替换过程。
根据实验提示,为每个缓冲区添加了记录最近使用时间的 ticks
字段。基于这一设计,寻找最近最久未使用(LRU)缓冲区的最直观方法,便是遍历所有哈希桶,从中找出引用计数 refcnt
为零且 ticks
值最小的那个缓冲区。
与内存管理器仅需定位单个空闲页即可完成替换不同,块缓存必须在全局所有缓存块中寻找 LRU 缓冲区,这一过程不仅涉及简单的锁保护,更引入了复杂的并发安全问题,如死锁避免等,主要包括以下几个情形:
1)如何协调多个 CPU 同时寻找 LRU 缓冲区的请求?如果并行执行,可能出现多个 CPU 选中同一缓冲区的情况。
通过引入全局锁 bcache.lock
,将整个 LRU 查找过程串行化,确保同一时间仅有一个 CPU 可执行查找操作。
2)即使使用全局锁,也只能保证 LRU 查找交换的过程是串行化的,但仍无法防止在查找过程中,其他线程修改之前已经确定待替换缓冲块的 refcnt
,从而导致数据不一致的情况?
任何对缓冲区的操作(bget
、brelse
或 LRU 查找),都必须先获取该缓冲区所在哈希桶的锁,从而确保同一桶内所有缓冲块的操作是串行执行的。
3)若一个线程在获取锁后又想获取另一个锁,如果不对加锁的顺序进行同一限制,可能会造成死锁。若有两个线程都在进行 LRU 查找交换的过程,线程1先检查了桶2和3,准备检索桶5,此时它已经获得了桶2和3的锁,申请桶5的锁;线程2先检查了桶5和6,准备检索桶3此时它已经获得了桶5和6的锁,但由于线程1持有锁3,因此它一直在等待它释放这个锁,然而线程1也一直等待线程2释放锁5,此时形成了死锁。
强制规定所有线程必须按照哈希桶的索引号从小到大顺序申请锁,可以避免死锁的发生。
4)既然已经规定了按顺序获取锁,是否仍需要全局锁来串行化整个 LRU 查找和替换过程?
没有必要了,毕竟所有 LRU 查找和替换过程都要先获得第一个桶的锁,某种程度上也是让这个操作按顺序执行了。
5)查找到 LRU 缓冲区后,应如何安全地将其从原桶移动至目标桶?移动过程中应何时释放相关桶锁?
移动过程本质是将缓冲块从原桶链表拆除,插入目标桶链表。
我一开始直接想的是,从原桶拆除后,即可释放旧锁。
这样的设计存在一个关键隐患:一旦移动线程释放了旧桶的锁,其他正在等待该锁的线程便可立即获取并访问原缓冲区,可能会修改其 refcnt
等状态。若此时该缓冲区已被移至新桶,但其 refcnt
已被改为1,便导致数据状态不一致,违反缓存一致性要求。
因此,必须在整个缓冲区移动过程中同时持有旧桶与新桶的锁,直至其完全成功迁移并更新元数据后,再统一释放这两把锁。
源码
初始化过程
和 Memory allocator 一样,原先的bcache
结构体改为在这里表示一个哈希桶bucket
使用kmem
全局数组表示每个CPU独立维护的内存管理单元
初始化过程暂略
struct bucket {
struct spinlock lock;
struct buf head;
};
struct {
struct spinlock lock;
struct buf buf[NBUF];
struct bucket bucket[NBUCKET];
} bcache;
LRU 时间计数方式
与原先释放块缓存的逻辑相比,先获取的是本桶的锁,原先通过在释放块缓存时增加写入,实当在读取/写入时,需要更新块缓存的最近使用时间,以
struct buf {
uint ticks; // 添加该字段用于记录本缓存的最近使用时间
};
struct buf* bread(uint dev, uint blockno) {
b->ticks = ticks; // 增加时间计数
}
void bwrite(struct buf *b) {
b->ticks = ticks; // 增加时间计数
}
重点:LRU Cache 获取与交换
寻找LRU Cache
// acquire(&bcache.lock);
for(i = 0; i < NBUCKET; i++) {
bk = &bcache.bucket[i];
acquire(&bk->lock);
for(b = bk->head.next; b != &bk->head; b = b->next) {
if(b->refcnt == 0) {
if(min_ticks == -1 || min_ticks > b->ticks) {
lrub = b;
min_ticks = b->ticks;
lrub_index = i;
}
}
}
}
如果没有空闲缓存块,直接报错,这里处理的比较直接
if(!lrub) {
for(i = 0; i < NBUCKET; i++)
release(&bcache.bucket[i].lock);
// release(&bcache.lock);
panic("bget: no buffers");
}
先释放与交换操作无关的哈希桶锁,允许其他线程同时访问这些未被占用的桶,从而降低锁竞争
for(i = 0; i < NBUCKET; i++) {
if (i != lrub_index && i != index) {
bk = &bcache.bucket[i];
release(&bk->lock);
}
}
如果原哈希桶和移动的目标哈希桶一致,不用交换,直接释放哈希桶的锁
if(lrub_index == index) {
release(&bcache.bucket[index].lock);
// release(&bcache.lock);
}
如果不一致,从原桶链表摘除 LRU 缓存块后添加到新桶,只有这两个操作都完成了才能释放两个哈希桶的锁
lrub->prev->next = lrub->next;
lrub->next->prev = lrub->prev;
bk = &bcache.bucket[index];
lrub->prev = &bk->head;
lrub->next = bk->head.next;
bk->head.next->prev = lrub;
bk->head.next = lrub;
release(&bcache.bucket[index].lock);
release(&bcache.bucket[lrub_index].lock);
// release(&bcache.lock);