C++用Mutex实现读写锁

近期答辩完成了,想回头看看之前没做过的2PL。

实现2PL有4种方式:

  1. 死锁检测。本篇是为了做这个而实现的,做这个事情的原因是c++标准库的shared_mutex无法从外界告知获取锁失败。
  2. 如果需要等待,那么马上结束txn。C++中有try_lock这样的方式,如果上锁失败就返回false,这样就可以实现这个了。
  3. 如果需要等待,那么杀死当前已经获得锁的一方。
  4. 在上锁前对资源排序。

2和4是最简单的,没什么好说的。3比1略容易一些。

基本思路

一个读写锁应该具有以下特征:

  1. 多个读者可以同时访问
  2. 写者独占访问
  3. 写者与读者互斥
  4. 避免写者饥饿或读者饥饿
  5. 锁的递归使用

由于实现的锁不能够出现读饿死、写饿死的现象,所以我想到一个很简单的方法:先到先得。当然也许会有其他方案。

先到先得的方式下,如何判断一个线程是否该阻塞?

  1. 第一个写请求之前的所有读请求可以进行
  2. 如果第一个请求是写请求,那么只有这一个写请求可以进行
  3. 如果没有写请求,那么所有读都可以进行
  4. 如果没有读请求,那么第一个写请求可以进行。这实际是2的特殊情况
  5. 其他请求都不可以进行

我们画图来说明一下。

假定某一刻有这些请求被阻塞,现在考虑挑出来可以执行的线程来执行

C++用Mutex实现读写锁

队列中,第一个写请求之前的读都可以进行,所以此时1,2线程是可以执行的。它们读完后释放锁,于是在这个队列中删除了1,2

C++用Mutex实现读写锁

1,2删除后,3可以正常执行。6在环检测的时候被要求结束,然后线程3也结束了,所以此时所有的读都可以进行。

C++用Mutex实现读写锁

实现先到先得,可以通过记录正在进行读的线程数量,正在进行写的线程数量,请求写但是被阻塞的线程数量,请求读但是被阻塞的线程数量,然后根据条件来分配资源给某个线程……维护的信息数量可能不止这些,比如说需要维护哪些线程的读被阻塞了。

而环检测的2PL,我们需要在外界通知线程锁获取失败,所以选择了使用队列来实现,这个队列需要支持:

  1. 添加读者、写者(AddReader, AddWriter)
  2. 删除读者、写者(RemoveReader, RemoveWriter,为了简化,统一为一个Remove了)
  3. 当可以获得锁的时候,提醒可以获得锁的线程。这个可以用condition_variable实现
  4. 确定某个线程是否应该阻塞

然而做这样一个队列还是需要费一些功夫的。

队列实现

明确了功能需求后,考虑一下需要什么样的数据结构。普通的队列肯定是不够的,毕竟我们会删除其中任意一个元素,容易想到的是map/set。然后考虑到先到先得的顺序要求,可以考虑额外记录一个逻辑时间timestamp,每当一个请求到达,就递增timestamp。由于加入了timestamp,所以为了支持删除,至少需要tid:timestamp的映射。而为了支持按timestamp查询,至少需要timestamp:tid的映射。此外,需要记录一个请求是读还是写,所以一共需要tid:timestamp的映射和timestamp:<tid,读写标记>的映射。

timestamp:<tid,读写标记>映射关系,很容易想到通过std::map这种天然自带排序的数据结构来实现,即:

  1. 从最小到最大遍历开头的读请求,这部分线程可以直接执行。
  2. 如果是写请求开头的,那么这个写可以直接执行。
  3. 解锁的时候删除该线程的记录。

笔者在此前做了CMU15445,里面的GC的watermark和这个非常显相似。CMU15445中作者提到了可以使用unordered_map来将时间复杂度从O(logn)优化到O(1),这种做法我想到了,所以这里的队列使用的都是unordered_map。

#pragma once #ifndef READER_WRITER_QUEUE_H #define READER_WRITER_QUEUE_H // INSPIRED BY CMU15445 fall2023 watermark #include <cassert> #include <unordered_map> class ReaderWriterQueue {  public:   void AddReader(int tid) {     assert(tid_ts.count(tid) == 0);     ts_tt[next_timestamp] = {tid, TidType::kRead};     tid_ts[tid] = next_timestamp;     next_timestamp++;   }   void AddWriter(int tid) {     assert(tid_ts.count(tid) == 0);     ts_tt[next_timestamp] = {tid, TidType::kWrite};     tid_ts[tid] = next_timestamp;     next_timestamp++;   }   void Remove(int tid) {     auto ts = tid_ts.find(tid);     if (ts == tid_ts.end()) return;     assert(ts_tt.count(ts->second) == 1);     ts_tt.erase(ts->second);     tid_ts.erase(ts);   }   bool ShallBlock(int tid) {     ResetMinWriteTimestamp();     ResetMinTimestamp(); // 这两个timestamp处理可以合并     auto iter = tid_ts.find(tid);     assert(iter != tid_ts.end());     assert(ts_tt.count(iter->second) == 1);     auto ts = iter->second;     auto [_, type] = ts_tt[ts];     // 如果读者之前有写者,那么就需要阻塞等待     if (type == TidType::kRead) return ts > min_write_ts;      // 如果写者之前有读者,那么就需要阻塞等待     if (min_ts < min_write_ts) return true;     // 如果写者之前有写者,那么就需要阻塞等待     return ts_tt[min_write_ts].tid != tid;   }  private:   void ResetMinWriteTimestamp() {     for (; min_write_ts < next_timestamp; min_write_ts++) {       auto iter = ts_tt.find(min_write_ts);       if (iter == ts_tt.end()) {         continue;       } else if (iter->second.type == TidType::kWrite) {         break;       } else { // iter->second.type == TidType::kRead         continue;       }     }   }   void ResetMinTimestamp() {     for (; min_ts < next_timestamp; min_ts++) {       auto iter = ts_tt.find(min_ts);       if (iter != ts_tt.end())         break;     }   }   long next_timestamp = 0;   long min_write_ts = 0;   long min_ts = 0;   struct TidType {     int tid;     enum LockType {kRead, kWrite} type;     bool operator==(const TidType &rhs) const {       return tid == rhs.tid && type == rhs.type;     }   };   std::unordered_map<long, TidType> ts_tt;   std::unordered_map<int, long> tid_ts; }; #endif // READER_WRITER_QUEUE_H 

将队列封装为读写锁

这一步封装已经非常容易了,一个请求到来,添加到队列中。如果需要阻塞,那么就通过condition_variable等待通知。解锁的时候,不仅仅需要在队列中进行移除,还需要notify_all。notify_all还可以优化,但是这不是那么容易的事情了,不考虑。

#pragma once #ifndef SIMPLE_SHARED_MUTEX_H #define SIMPLE_SHARED_MUTEX_H #include <condition_variable> #include <ctime> #include <cstdio> #include <mutex> #include <unistd.h> #include "reader_writer_queue.h" class SimpleSharedMutex {  public:   void lock() {     std::unique_lock lock{mtx};     auto tid = ::gettid();     queue.AddWriter(tid);     while (queue.ShallBlock(tid)) cv.wait(lock);     // printf("lock %dn", tid);   }   void shared_lock() {     std::unique_lock lock{mtx};     auto tid = ::gettid();     queue.AddReader(tid);     while (queue.ShallBlock(tid)) cv.wait(lock);     // printf("slock %dn", tid);   }   void unlock() {     std::unique_lock lock{mtx};     queue.Remove(::gettid());     cv.notify_all();     // printf("ulock %dn", ::gettid());   }   void shared_unlock() {     std::unique_lock lock{mtx};     queue.Remove(::gettid());     cv.notify_all();     // printf("uslock %dn", ::gettid());   }  private:   std::mutex mtx;   ReaderWriterQueue queue;   std::condition_variable cv; }; #endif // SIMPLE_SHARED_MUTEX_H 

发表评论

评论已关闭。

相关文章