Lab8 Lock


Lab链接:Lab: locks

Lab源码:momo/MIT-6S081/lock - Gitee.com

1 Memory allocator

题目

当前 xv6 调用 kalloc/kfree 需要先获取 kmem.lock 锁,但是这个锁保护的临界区是整个内存空间,粒度较大

你的任务是缩小 kmem.lock 锁的粒度,具体做法是,每个 CPU 维护一个属于自己的内存页列表,以实现不同CPU独立自行管理内存,实现并行。但若当前CPU列表没有空闲页,则从另一个 CPU 空闲列表“窃取”一页分配,所以还是要锁

思路

原先kmem结构体在这里表示一个CPU内存页列表 kmem_t,使用kmem全局数组表示每个CPU独立维护的内存管理单元

struct kmem_t {
  struct spinlock lock;
  struct run *freelist;
};

struct kmem_t kmem[NCPU];

在初始化每个CPU的内存分配器时,每个CPU分配内存空间的大小为page_cnt * PGSIZE

由于按照CPU编号从小到大的顺序进行分配,每个CPU获得的内存起始地址都是基于前序CPU已分配内存的累加结果。具体来说,第i个CPU的内存空间起始位置是在初始地址end的基础上,加上前 i-1 个CPU所分配的全部内存页的总大小

因此,freerange函数的操作范围在end + page_cnt * PGSIZEend + id * INTERVAL + PGSIZE地址区间

void kinit() {
    char lock_name[16];
    void *pa;
    int i, page_per_cpu, left, page_cnt;

    // 计算每个CPU平均分配的页数
    page_per_cpu = ((PHYSTOP - (uint64)end) / PGSIZE) / NCPU;
    left = ((PHYSTOP - (uint64)end) / PGSIZE) % NCPU;
    pa = (void*)end;

    for (i = 0; i < NCPU; i++) {
        // 初始化每个CPU的锁
        snprintf(lock_name, 16, "kmem%d", i);
        initlock(&kmem[i].lock, lock_name);

        // 分配内存区域(考虑不能整除的情况)
        page_cnt = left-- > 0 ? page_per_cpu + 1 : page_per_cpu;
        freerange(pa, (void*)(pa + page_cnt * PGSIZE));
        pa += page_cnt * PGSIZE;
    }
}

释放页的逻辑与原先基本一致,只是获取的是本CPU的锁

分配页时,优先从当前CPU获取空闲页,如果没有空闲页时遍历其他CPU列表,每次操作前需获取对应CPU的锁

void *kalloc(void) {
    for(int i = 0; i < NCPU; i++) {
        if(i == cpuid()) continue;
        acquire(&kmem[i].lock);
        struct run *r = kmem[i].freelist;
        if(r) {
            kmem[i].freelist = r->next;
            release(&kmem[i].lock);
            return (void*)r;
        }
        release(&kmem[i].lock);
    }
    return NULL;
}

bug fix:

在完成初始版本后,出现“panic mappages ”报错,遂使用gdb调试排查原因。

常见的思路是,在哪报错就在报错所在函数的入口处打断点:这里使用b vm.c:152mappages函数入口处打上断点

c继续执行,gdb在mappages处暂停,使用bt full查看当前调用栈,kinit -> kvminit ->kmappages

打印参数,va的地址对应memlayout.h中PCIE页所在地址

因此推断可能是初始化内存的范围超出了end~PHYSTOP,kmappages检测到该页已被映射,从而触发错误

即问题根源在于kinit的初始化过程,而非原先怀疑的kallockfree相关逻辑

2 Buffer cache

题目

当前 xv6 的块缓存(Buffer Cache)使用一个全局锁 bcache.lock 来保护所有块缓存,导致对缓冲区的任意读写操作完全串行执行的,这在高并发场景下会成为性能瓶颈。

本实验的目标是减少锁竞争,通过哈希分桶的方式让不同块的操作可以并行进行。

思路

所有缓存块按blockno哈希到不同的桶中,这样仅当多个线程操作同一哈希桶时才会发生锁竞争,对不同桶的操作可以并行执行,显著减少锁地争用。

将缓存块按照哈希桶分组管理,bget/brelease 和内存管理器 kalloc/kfree 的操作类似,难点在于 bget未命中缓存时,寻找合适的缓存块与替换过程

根据实验提示,为每个缓冲区添加了记录最近使用时间的 ticks 字段。基于这一设计,寻找最近最久未使用(LRU)缓冲区的最直观方法,便是遍历所有哈希桶,从中找出引用计数 refcnt 为零且 ticks 值最小的那个缓冲区。

与内存管理器仅需定位单个空闲页即可完成替换不同,块缓存必须在全局所有缓存块中寻找 LRU 缓冲区,这一过程不仅涉及简单的锁保护,更引入了复杂的并发安全问题,如死锁避免等,主要包括以下几个情形:

1)如何协调多个 CPU 同时寻找 LRU 缓冲区的请求?如果并行执行,可能出现多个 CPU 选中同一缓冲区的情况。

通过引入全局锁 bcache.lock,将整个 LRU 查找过程串行化,确保同一时间仅有一个 CPU 可执行查找操作。

2)即使使用全局锁,也只能保证 LRU 查找交换的过程是串行化的,但仍无法防止在查找过程中,其他线程修改之前已经确定待替换缓冲块的 refcnt ,从而导致数据不一致的情况?

任何对缓冲区的操作(bgetbrelse 或 LRU 查找),都必须先获取该缓冲区所在哈希桶的锁,从而确保同一桶内所有缓冲块的操作是串行执行的。

3)若一个线程在获取锁后又想获取另一个锁,如果不对加锁的顺序进行同一限制,可能会造成死锁。若有两个线程都在进行 LRU 查找交换的过程,线程1先检查了桶2和3,准备检索桶5,此时它已经获得了桶2和3的锁,申请桶5的锁;线程2先检查了桶5和6,准备检索桶3此时它已经获得了桶5和6的锁,但由于线程1持有锁3,因此它一直在等待它释放这个锁,然而线程1也一直等待线程2释放锁5,此时形成了死锁。

强制规定所有线程必须按照哈希桶的索引号从小到大顺序申请锁,可以避免死锁的发生。

4)既然已经规定了按顺序获取锁,是否仍需要全局锁来串行化整个 LRU 查找和替换过程?

没有必要了,毕竟所有 LRU 查找和替换过程都要先获得第一个桶的锁,某种程度上也是让这个操作按顺序执行了。

5)查找到 LRU 缓冲区后,应如何安全地将其从原桶移动至目标桶?移动过程中应何时释放相关桶锁?

移动过程本质是将缓冲块从原桶链表拆除,插入目标桶链表。

我一开始直接想的是,从原桶拆除后,即可释放旧锁。

这样的设计存在一个关键隐患:一旦移动线程释放了旧桶的锁,其他正在等待该锁的线程便可立即获取并访问原缓冲区,可能会修改其 refcnt 等状态。若此时该缓冲区已被移至新桶,但其 refcnt 已被改为1,便导致数据状态不一致,违反缓存一致性要求。

因此,必须在整个缓冲区移动过程中同时持有旧桶与新桶的锁,直至其完全成功迁移并更新元数据后,再统一释放这两把锁。

源码

初始化过程

和 Memory allocator 一样,原先的bcache结构体改为在这里表示一个哈希桶bucket

使用kmem全局数组表示每个CPU独立维护的内存管理单元

初始化过程暂略

struct bucket {
  struct spinlock lock;
  struct buf head;
};

struct {
  struct spinlock lock;
  struct buf buf[NBUF];
  struct bucket bucket[NBUCKET];
} bcache;

LRU 时间计数方式

与原先释放块缓存的逻辑相比,先获取的是本桶的锁,原先通过在释放块缓存时增加写入,实当在读取/写入时,需要更新块缓存的最近使用时间,以

struct buf {
  uint ticks;  // 添加该字段用于记录本缓存的最近使用时间
};

struct buf* bread(uint dev, uint blockno) {
  b->ticks = ticks;  // 增加时间计数
}

void bwrite(struct buf *b) {
  b->ticks = ticks;  // 增加时间计数
}

重点:LRU Cache 获取与交换

寻找LRU Cache

// acquire(&bcache.lock);
for(i = 0; i < NBUCKET; i++) {
    bk = &bcache.bucket[i];
    acquire(&bk->lock);
    for(b = bk->head.next; b != &bk->head; b = b->next) {
      if(b->refcnt == 0) {
        if(min_ticks == -1 || min_ticks > b->ticks) {
          lrub = b;
          min_ticks = b->ticks;
          lrub_index = i;
        }
      }
    }
}

如果没有空闲缓存块,直接报错,这里处理的比较直接

if(!lrub) {
    for(i = 0; i < NBUCKET; i++) 
      release(&bcache.bucket[i].lock);   
    // release(&bcache.lock); 
    panic("bget: no buffers");
}

先释放与交换操作无关的哈希桶锁,允许其他线程同时访问这些未被占用的桶,从而降低锁竞争

for(i = 0; i < NBUCKET; i++) {
    if (i != lrub_index && i != index) {
      bk = &bcache.bucket[i];
      release(&bk->lock);   
    }
}

如果原哈希桶和移动的目标哈希桶一致,不用交换,直接释放哈希桶的锁

if(lrub_index == index) {
    release(&bcache.bucket[index].lock);
    // release(&bcache.lock);
}

如果不一致,从原桶链表摘除 LRU 缓存块后添加到新桶,只有这两个操作都完成了才能释放两个哈希桶的锁

lrub->prev->next = lrub->next;
lrub->next->prev = lrub->prev;

bk = &bcache.bucket[index];
lrub->prev = &bk->head;
lrub->next = bk->head.next;
bk->head.next->prev = lrub;
bk->head.next = lrub;
release(&bcache.bucket[index].lock);
release(&bcache.bucket[lrub_index].lock);
// release(&bcache.lock);

文章作者: AthenaCrafter
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 AthenaCrafter !
  目录