生产者消费者模型
概念: 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过一个来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产消费者模型优点:
- 解耦:生产者和消费者是通过一个共享数据区域来进行通信。而不是直接进行通信,这样两个角色之间的依耐性就降低了(代码层面实现解耦),变成了角色与共享数据区域之间的弱耦合,一个逻辑出错不影响两一个逻辑,二者变得更独立。
- 支持并发:生产者负责生产数据,消费者负责拿数据。生产者生产完数据可以继续生产,大部分时间内是不需要等待消费者消费数据才继续生产。也就是说,在任一时刻,二者都是在正常处理任务的,进度都得以推进。
- 支持忙闲下不均:生产者生产了数据是放进容器中,消费者不必立即消费,可以慢慢地从容器中取数据。容器快要空了,消费者的消费速度就可以降下来,让生产者继续生产。
生产消费模型特征(简记321):
- 3种关系: 生产者与生产者(互斥)、生产者与消费者(同步(主要)和互斥)和消费者与消费者(互斥)
- 两个角色: 生产者和消费者
- 一个交易场所: 容器、共享资源等
基于阻塞队列的生产者消费者模型
阻塞队列的特点:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素
- 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
- 在这个模型中,生产者和消费者的交易场所就是阻塞队列
实现
概述:
因为交易场所是一个阻塞队列,所以,我封装了一个BlcokingQueue 的类,这里提供了放数据和取数据这样两个主要的方法。其中,有五个成员变量:
- 队列: 使用STL中的queue来实现
- 容量: 阻塞队列的容量,由用户给定,我们也可以提供一个默认的容量
- 互斥量: 为了实现生产者和消费者的同步,我们需要使用条件变量和互斥量来实现同步的操作
- 生产者唤醒和等待的条件变量: 当队列满了,生产者等待条件满足,应该挂起等待,等待消费者唤醒
- 消费者唤醒和等待的条件变量: 当队列为空,消费者等待条件满足,应该挂起等待,等待生产者唤醒
构造函数和析构函数:
-
构造函数做初始化和资源分配的操作,分配锁资源和条件变量
-
析构函数做清理资源的操作,对锁和条件变量进行销毁
队列类的整体架构
#include<iostream> #include<string.h> using namespace std;//标准命名空间 //类模板 template<class T> class BlockQueue { public: //构造函数,化容量为5 BlockQueue(int capacity = 5) :_capacity(capacity) { //初始化锁和互斥量 pthread_mutex_init(&_lock,nullptr); pthread_cond_init(&_c_cond,nullptr); pthread_cond_init(&_p_cond,nullptr); } //析构函数 ~BlockQueue() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_c_cond); pthread_cond_destroy(&_p_cond); } private: //队列容器 queue<T> _q; size_t _capacity;//队列最大容器 pthread_mutex_t _lock;//互斥锁 pthread_cond_t _c_cond;//消费者被唤醒和挂起的条件变量 pthread_cond_t _p_cond;//生产者被唤醒和挂起的条件变量 }
基本方法的封装
我对阻塞队列的一些基本操作进行了封装,有以下几个处理动作(可以设置为私有方法):
- 判断队列为空或为满
- 唤醒生产者和唤醒消费者
- 生产者挂起等待和消费者挂起等待
- 加锁和解锁
private: bool IsFull() { return _q.size() == _capacity; } bool IsEmpty() { return _q.empty(); } void ConsumerWait() { cout << "consumer wait...." << endl; pthread_cond_wait(&_c_cond, &_lock); } void WakeUpConsumer() { cout << "wake up consumer...." << endl; pthread_cond_broadcast(&_c_cond); } void ProductorWait() { cout << "productor wait...." << endl; pthread_cond_wait(&_p_cond, &_lock); } void WakeUpProductor() { cout << "wake up productor...." << endl; pthread_cond_broadcast(&_p_cond); } void LockQueue() { pthread_mutex_lock(&_lock); } void UnLockQueue() { pthread_mutex_unlock(&_lock); }
放数据和取数据
- 生产者进行相关操作前先上锁,队列如果为满就需要挂起等待。队列不为满就生成一个数据,然后需要把数据放入阻塞队列中,解开锁之后唤醒消费者,喊消费者进行消费。
- 消费者进行相关操作前先上锁,队列如果为空就需要挂起等待。队列不为空就需要从阻塞队列中取一个数据,然后解开锁之后唤醒消费者,喊生产者进行生产。
void ProductData(T data) { LockQueue(); while (IsFull()){ // 让productor挂起等待 ProductorWait(); } // 放数据 _q.push(data); UnLockQueue(); // 唤醒consumer WakeUpConsumer(); } void ConsumeData(T& data) { LockQueue(); while (IsEmpty()){ // 让consumer挂起等待 ConsumerWait(); } // 取数据 data = _q.front(); _q.pop(); UnLockQueue(); // 唤醒productor WakeUpProductor(); }
注意: 在临界资源判断唤醒条件是否就绪应该使用while循环检测,被唤醒的线程并不着急立即往下执行,而是再进行一次检测,判断当前唤醒条件是否真的就绪了。因为唤醒线程的这个函数调用可能会发生失败,且线程可能是在条件不满足的时候被唤醒,发生误判被伪唤醒。
封装一个任务
我们可以实现一个任务类,生产者把这个任务放进阻塞队列中,消费者取出并进行处理。其中还有一个run的任务执行方法
class Task { public: Task(int a = 0, int b = 0) :_a(a) ,_b(b) {} int Run() { return _a + _b; } int GetA() { return _a; } int GetB() { return _b; } private: int _a; int _b; };
单生产者和单消费者模型分析
BlockQueue<Task>* q;// 阻塞队列 void* Consumer(void* arg) { long id = (long)arg; while (1){ // 消费(取)数据 Task t(0, 0); q->ConsumeData(t); cout << "consumer " << id << " consumes a task: " << t.GetA() << " + " << t.GetB() << " = " << t.Run() << endl; sleep(1);// 后面可注释,调整速度 } } void* Productor(void* arg) { long id = (long)arg; while (1){ // 生产(放)数据 int x = rand()%10 + 1; int y = rand()%10 + 1; Task t(x, y); cout << "productor " << id << " produncs a task: " << x << " + " << y << " = ?" << endl; q->ProductData(t); sleep(1);// 后面可注释,调整速度 } } int main() { srand((size_t)time(nullptr)); // 创建一个交易场所 q = new BlockQueue<Task>; pthread_t p, c; pthread_create(&p, nullptr, Productor, (void*)(1)); pthread_create(&c, nullptr, Consumer, (void*)(1)); pthread_join(p, nullptr); pthread_join(c, nullptr); delete q; return 0; }
代码运行的结果分三种情况分析:
1.消费者和生产者以相同的速度执行
可以看出的是,生产者生成完一个任务,消费者就处理了一个任务。接着生产者继续生产,消费者跟着处理,二者步调一致,一直并行的状态
2.生产者快,消费者慢
生产者生产速度快,导致一上来生产者就把队列塞满了任务,接着唤醒消费者来消费,然后挂起等待。紧接着消费者处理完一个任务就唤醒生产者来生产,生产者生产了一个任务就喊消费者来消费,然后继续挂起。可以看出的是,在这种情况下,队列长时间是满的,生产者大部分时间是挂起等待的。生产者和消费者开始一小部分时间内步调是不一致的,生产者生产完,消费者才消费是串行的,但是过了一会,二者步调就变得一致了,且速度是随消费者的
3.生产者慢,消费者快
生产者的速度慢,生产者生产一个任务立马唤醒消费者,消费者处理完一个数据,发现队列为空,然后挂起等待,接着生产者继续生产一个任务,然后唤醒消费者。可以看出的是,队列大部分时间是为空的,消费者大部分时间是处于挂起等待的,二者步调一直是一致的,且执行速度是随生产者的,也是并行的
多生产者和多消费者模型分析
做到几点:
- 生产者之间需要互斥,也就是生产者和生产者之间需要组内竞争一把锁,消费者也是如此
- 生产者和消费者之间用互斥量和条件变量做到同步和互斥(上面就做到了)
#define P_COUNT 3 #define C_COUNT 3 BlockQueue<Task>* q; pthread_mutex_t c_lock;// 消费者的锁 pthread_mutex_t p_lock;// 生产者的锁 void* Consumer(void* arg) { long id = (long)arg; while (1){ pthread_mutex_lock(&c_lock); // 消费(取)数据 Task t(0, 0); q->ConsumeData(t); cout << "consumer " << id << " consumes a task: " << t.GetA() << " + " << t.GetB() << " = " << t.Run() << endl; pthread_mutex_unlock(&c_lock); sleep(1); } } void* Productor(void* arg) { long id = (long)arg; while (1){ pthread_mutex_lock(&p_lock); // 生产(放)数据 int x = rand()%10 + 1; int y = rand()%10 + 1; Task t(x, y); cout << "productor " << id << " produncs a task: " << x << " + " << y << " = ?" << endl; q->ProductData(t); pthread_mutex_unlock(&p_lock); sleep(1); } } int main() { srand((size_t)time(nullptr)); pthread_mutex_init(&c_lock, nullptr); pthread_mutex_init(&p_lock, nullptr); // 创建一个交易场所 q = new BlockQueue<Task>; pthread_t p[P_COUNT]; pthread_t c[C_COUNT]; for (long i = 0; i < P_COUNT; ++i) { pthread_create(p+i, nullptr, Productor, (void*)(i+1)); } for (long i = 0; i < C_COUNT; ++i) { pthread_create(c+i, nullptr, Consumer, (void*)(i+1)); } for (int i = 0; i < C_COUNT; ++i) { pthread_join(c[i], nullptr); } for (int i = 0; i < P_COUNT; ++i) { pthread_join(p[i], nullptr); } pthread_mutex_destroy(&c_lock); pthread_mutex_destroy(&p_lock); delete q; return 0; }
注意:
- 生产者之间需要一个互斥量,消费者之间也需要一个互斥量
- 生产者和消费者的个数可以自己调整宏变量
运行结果如下:
POSIX信号量
介绍
POSIX信号量: 该信号量允许进程和线程同步对共享资源的访问。同时也可以用于实现线程间同步。
总结几点:
- 是什么? 信号量本质是一个计数器,描述临界资源的有效个数。申请一个资源就对信号量减1(P操作),释放一个资源就对信号量加1(V操作)
- 为什么? 临界资源可以看成很多份,互相不冲突且高效
- 怎么用? 可以使用信号量的相关接口,来申请信号量和释放信号量(下面详细介绍)
相关接口的介绍
下面要介绍的POSIX信号量相关接口都是在semaphore.h的头文件中。信号量是一个类型为sem_t的变量
- 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value); 功能: 创建一个信号量并初始化它的值。一个无名信号量在被使用前必须先初始化 参数: sem:信号量的地址 pshared:等于0,信号量在线程间共享(常用)。不等于0,信号量在进程间共享 value:信号量的初始值 返回值: 成功:0 失败:-1 信号量的初始值就是可用资源的个数
- 销毁信号量
int sem_destroy(sem_t *sem); 功能: 删除sem标识的信号量 参数: sem:信量地址 返回值: 成功:0 失败:-1
- 信号量P操作(减一)
int sem_wait(sem_t *sem); 功能: 将信号量的值减1。操作前,先检查信号量(sem)的值是否为0,若信号量为0,此函数会阻塞,直到信号量大于0时才进行减一操作 参数: sem:信号量的地址 返回值: 成功:0 失败:-1 int sem_trywait(sem_t *sem); 以非阻塞的方式来对信号量进行减1操作 若操作前,信号量的值等于0,则对信号量的操作失败,函数立即返回 int sem_timedwait(sem_t* sem,const struct timespec *abs_timeout); 限时尝试将信号量的值减1 abs_timeout:绝对时间
- 信号量V操作(加一)
int sem_post(sem_t *sem); 功能: 将信号量的值加1并发出信号唤醒等待线程(sem_wait()) 参数: sem:信号量的地址。 返回值: 成功:0
- 获取信号量的值
int sem_getvalue(sem_t *sem,int *Sval); 功能: 获取sem标识的信号量的值,保存在sval中 参数: sem:信号量地址 sval:保存信号值的地址 返回值: 成功:0 失败:-1
举个简单的例子:
初始化信号量的资源数是1,意思就是系统中只有一份资源,线程1如果要申请这份资源,首先要进行p操作,sem_wait()函数,那么资源数-1,此时资源数是0,说明系统中没有可用的资源,其他线程在进行p操作的时候就会阻塞,线程1执行完毕,执行v操作,资源数+1,系统中的资源被释放出来,线程2此时解除阻塞状态,步骤和线程1一样。
代码示例:
#include <iostream> #include <unistd.h> #include <pthread.h> #include <semaphore.h> using namespace std; sem_t sem; void* run1(void* arg) { while (1){ sem_wait(&sem); cout << "run1 is running..." << endl; sem_post(&sem); sleep(1); } } void* run2(void* arg) { while (1){ sem_wait(&sem); cout << "run2 is running..." << endl; sem_post(&sem); sleep(1); } } int main() { sem_init(&sem, 0, 1); pthread_t t1, t2; pthread_create(&t1, nullptr, run1, nullptr); pthread_create(&t2, nullptr, run2, nullptr); sem_destroy(&sem); pthread_join(t1, nullptr); pthread_join(t2, nullptr); return 0; }
运行结果如下:
注意:当信号量的初始值为1的时候,就相当于一把互斥锁
基于环形队列的生产消费模型
环形队列介绍
环形队列: 环形队列和普通队列的区别就是,这种队列是一种环形的结构,有一个头指针和一个尾指针维护环中一小段队列。
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
这就类似于数据结构中学的循环队列类似。
因为信号量就是一个计数器,所以我们可以通过信号量来实现多线程间的同步过程。
实现
一个交易场所: 循环队列
两个角色:
- 生产者:需要申请空间资源(P操作),然后释放数据资源(V操作)
- 消费者:需要申请数据资源(P操作),然后释放空间资源(V操作)
- 三种关系: 生产者与生产者(互斥)、生产者与消费者(同步(主要)和互斥)和消费者与消费者(互斥)
几个变量成员:
- 队列:数组模拟
- 容量:由用户给定
- 空间资源信号量:队列的容量大小
- 数据资源信号量:开始为0
- 生产者的下标位置:开始为0
- 消费者的下标位置:开始为0
代码示例:
#include<iostream> #include<string.h> #include<vector> #include<semaphore.h> #include<pthread.h> #include<unistd.h> using namespace std; //生产者:需要申请空间资源(P操作),然后释放数据资源(V操作) //消费者:需要申请空间资源(P操作),然后释放空间资源(V操作) template<class T> class RingQueue { public: RingQueue(int capacity = 5):_capacity(capacity),_rq(capacity),_c_index(0),_p_index(0) { //初始化空间资源信号量,容量为5 sem_init(&_blank_sem,0,_capacity); sem_init(&_data_sem,0,0); } ~RingQueue() { sem_destroy(&_blank_sem); sem_destroy(&_data_sem); } private: void P(sem_t& sem) { sem_wait(&sem); } void V(sem_t& sem) { sem_post(&sem); } public: void GetData(T& data) { // consumer申请数据资源 P(_data_sem); data = _rq[_c_index]; _c_index = (_c_index + 1) % _capacity; // consumer释放格子资源 V(_blank_sem); } void PutData(const T& data) { // productor申请格子资源 P(_blank_sem); _rq[_p_index] = data; _p_index = (_p_index + 1) % _capacity; // productor释放数据资源 V(_data_sem); } private: //vector容器模拟队列 vector<T> _rq; //队列的容量 size_t _capacity; //空间资源信号量 sem_t _blank_sem; //数据资源信号量 sem_t _data_sem; //消费者的下标位置:开始为0 int _c_index; //生产者的下标位置:开始为0 int _p_index; }; RingQueue<int> *q ; void* Consumer(void* arg) { long id = (long)arg; while (1){ // 消费(取)数据 int x; q->GetData(x); cout << "consumer " << id << " consumes a data: " << x << endl; sleep(1);// 后面可注释,调整速度 } } void* Productor(void* arg) { long id = (long)arg; while (1){ // 生产(放)数据 int x = rand()%10 + 1; q->PutData(x); cout << "productor " << id << " produncs a data: " << x<< endl; sleep(1);// 后面可注释,调整速度 } } int main() { // 创建一个交易场所 q = new RingQueue<int>(); srand((size_t)time(nullptr)); pthread_t p, c; pthread_create(&p, nullptr, Productor, (void*)(1)); pthread_create(&c, nullptr, Consumer, (void*)(1)); pthread_join(p, nullptr); pthread_join(c, nullptr); delete q; return EXIT_SUCCESS; }
注意:生产者生成数据前需要申请空间资源信号量(P(_blank_sem)),申请不成功就挂起等待,等待信号量来了继续获得信号量,然后释放数据资源信号量(V(_data_sem))
消费者消费数据前需要申请数据资源信号量(P(_data_sem)),申请不成功就挂起等待,等待信号量来了继续获得信号量,然后释放空间资源信号量(V(_blank_sem))
运行结果如下:
- 生产者和消费者执行速度一致
生产者生产完一个数据,然后消费者就消费了,二者步调一致,并发执行。
- 生产者快,消费者慢
生产者速度快,一下字就把队列塞满了数据(开始时二者步调不一致),接着生产者如果再去申请空间信号量,此时已经申请不到了,只能挂起等待,消费者消费数据是否空间信号量,这是生产者才可以继续生产,可以看出,在后面大部分时间,二者步调恢复一致了,且速度随消费者。
- 生产者慢,消费者快
生产生产者生产完一个数据,数据信号量加1,空间信号量减1,然后消费者里马消费了一个数据,数据信号量减1,空间信号量加1,此时数据信号量为0,消费者再去申请数据信号量,申请不到就挂起等待,只能等生产者在去生产释放空间信号量,然后消费者才可以申请到。可以看出的是,队列长时间是空的,二者步调一致,速度随生产者。