手写数据库连接池

📕数据库连接池项目

一、项目意义

在设计前先了解一下数据库连接池的作用:

除了在服务器端增加缓存服务器缓存常用的数据 之外(例如redis),还可以增加连接池,来提高MySQL Server的访问效率,在高并发情况下,大量的 TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的 性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗

二、环境配置

MySQ数据库编程环境配置:

在win10的vs项目中用C/C++客户端开发包,vs做如下配置:

1.右键项目属性 - C/C++ - 常规 - 附加包含目录,填写mysql.h头文件的路径

2.右键项目属性 - 链接器 - 常规 - 附加库目录,填写libmysql.lib的路径

3.右键项目属性 - 链接器 - 输入 - 附加依赖项,填写libmysql.lib库的名字

4.把libmysql.dll动态链接库(Linux下后缀名是.so库)放在工程目录下;

如果运行数据库连接文件出现:手写数据库连接池

说明系统环境变量中没有配置MySQL环境,找不到动态链接,解决方案:在系统环境变量添加MySQL的bin文件夹路径; 也可以在 右键项目属性 - 调试 - 环境,填写 PATH=自己MySQL的bin文件夹路径;

三、项目设计

所需要实现的数据库连接池功能:连接池有连接数量的起始值和阈值;连接池可以动态自动生成和回收连接池里面的资源;连接池和数据库信息等分离,达到复用简单的效果;

①设计一个数据库的的基本操作类

  • 封装数据库的连接、增删改查操作;
  • 给该基本操作类加上存活时间相关的属性和方法;

②数据库连接池类

  • 因为是一个池对应多个资源(对象)的关系,我们也只需要一个池,设计成单例模式;

  • 保存含有基本操作类一群对象资源;

  • 有资源的初始化、增加、释放操作(释放和存活时间相关)

四、详细代码

代码结构:

手写数据库连接池

①public.h: 该连接池的全局日志输出,打印一些错误在log中;

#pragma once /* * 定义宏等全局定义 */  #define LOG(str)  	cout << __FILE__ << ":" << __LINE__ << " " <<  	__TIMESTAMP__ << " : " << str << endl; 

②mysql.ini:mysql的详细信息

#数据库连接池的配置文件,和宏定义一样,注意行后面不要有空格 ip=127.0.0.1 port=3306 username=root password=root dbname=chat initSize=10 maxSize=1024 #最大空闲时间默认秒 maxIdleTime=60 #连接超时时间单位是毫秒 connectionTimeout=100 

③Connection.h 和 Connection.cpp :数据库操作类 头文件和实现;

Connection.h:

using namespace std; #include "public.h"  // 数据库操作类 class Connection { public: 	// 初始化数据库连接 	Connection(); 	// 释放数据库连接资源 	~Connection(); 	// 连接数据库 	bool connect(string ip, unsigned short port, string user, string password, string dbname); 	// 更新操作 insert、delete、update 	bool update(string sql); 	// 查询操作:select; 	MYSQL_RES* query(string sql);  	//刷新一下连接的起始空闲时间点 	void refreshAliveTime() { _alivetime = clock(); }//clock()函数当下时间 	// 返回存活的时间 	clock_t getAliveTime() { return clock() - _alivetime; }  private: 	MYSQL* _conn; // 表示和MySQL Server的一条连接 	clock_t _alivetime;// 记录进入空闲状态后的存活时间 }; 

Connection.cpp:

#include "Connection.h" #include "public.h"  // 初始化数据库连接 Connection::Connection() { 	_conn = mysql_init(nullptr); } // 释放数据库连接资源 Connection::~Connection() { 	if (_conn != nullptr) 		mysql_close(_conn); } // 连接数据库 bool Connection::connect(string ip, unsigned short port, string username, string password, string dbname) { 	MYSQL* p = mysql_real_connect(_conn, ip.c_str(), username.c_str(), password.c_str(), dbname.c_str(), port, nullptr, 0); 	return p != nullptr; } // 更新操作 insert、delete、update bool Connection::update(string sql) { 	if (mysql_query(_conn, sql.c_str()))//如果查询成功,返回0。如果出现错误,返回非0值。 	{ 		LOG("更新失败:" + sql); 		return false; 	} 	return true; } // 查询操作:select; MYSQL_RES* Connection::query(string sql) { 	if (mysql_query(_conn, sql.c_str())) 	{ 		LOG("查询失败:" + sql); 		return nullptr; 	} 	return mysql_use_result(_conn); }  

④MySQLConnectionPool.h和MySQLConnectionPool.cpp:连接池类头文件和实现

MySQLConnectionPool.h:

#pragma once #include <iostream> #include <string> #include <queue> #include <mutex> #include <condition_variable> #include <atomic> #include <thread> #include <memory> #include <functional> #include "Connection.h"  using namespace std; /* * 实现连接池模块 */ class ConnectionPool { public: 	//获取连接池对象实例 	static ConnectionPool* getConnectionPool(); 	// 消费者线程函数:给用户连接,归还时放回连接池 	shared_ptr<Connection> getConnection();  private: 	//单例#1 构造函数私有化 	ConnectionPool();   	//从配置文件中加载配置 	bool loadConfigFile();  	//生产者线程函数:运行独立的线程中,负责生产新连接,放在类内方便访问成员变量 	void produceConnectionTask();  	//回收线程函数:扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收 	void scannerConnectionTask();  	string _ip;// mysql ip 	unsigned short _port; //mysql 端口 默认3306 	string _username;// mysql 用户名; 	string _password;// mysql登录秘密 	string _dbname; // 数据库名称 	int _initSize;// 连接池的初始连接量 	int _maxSize;// 连接池的最大连接量 	int _maxIdleTime; //连接池最大空闲时间 	int _connectionTimeout;//连接池获取连接的超时时间  	queue<Connection*> _connectionQue; //存储mysql连接的队列,必须是线程安全的; 	mutex _queueMutex;//维护连接队列的线程安全互斥锁 	atomic_int _connectionCnt;// 记录连接所创建的connection连接的总数量,考虑了连接生产消费数量变化的线程安全问题 	condition_variable cv; //设置条件变量,用于连接 生产线程和消费线程的通信 };   

MySQLConnectionPool.cpp:

// MySQlConnectionPool.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 //避免重复包含头文件 #ifndef __COMPLEX__ #define __COMPLEX__  #include <iostream> #include <string> #include "MySQLConnectionPool.h" #include "public.h"  #endif; // 线程安全的懒汉单例函数接口 ConnectionPool* ConnectionPool::getConnectionPool() { 	static ConnectionPool pool; // 静态变量实现lock和unlock,拿单例的线程池; 	return &pool; }  // 从配置文件中加载配置项 bool ConnectionPool::loadConfigFile() { 	FILE* pf = fopen("mysql.ini", "r"); 	if (pf == nullptr) 	{ 		LOG("mysql.ini file is not exist!"); 		return false; 	}  	while (!feof(pf))//末尾查一下 	{ 		char line[1024] = { 0 }; 		fgets(line, 1024, pf); 		string str = line; 		int idx = str.find('=', 0);//找出第一个出现=号的下标 		if (idx == -1)//找不到,无效配置项 		{ 			continue; 		}  		//实际中是由n结尾的,password=rootn 		int endidx = str.find('n', idx); 		string key = str.substr(0, idx); //参数意义:截取的起点以及截取长度 		string value = str.substr(idx + 1, endidx - idx - 1);  		//存值 		if (key == "ip") _ip = value; 		else if (key == "port") _port = atoi(value.c_str()); 		else if (key == "username") _username = value; 		else if (key == "password") _password = value; 		else if (key == "dbname") _dbname = value; 		else if (key == "initSize") _initSize = atoi(value.c_str()); 		else if (key == "maxSize") _maxSize = atoi(value.c_str()); 		else if (key == "maxIdleTime") _maxIdleTime = atoi(value.c_str()); 		else if (key == "connectionTimeout") _connectionTimeout = atoi(value.c_str());  	} }  // 连接池的构造 ConnectionPool::ConnectionPool() { 	//加载配置项 	if (!loadConfigFile()) 	{ 		return; 	}  	//创建初始数量的连接 	for (int i = 0; i < _initSize; ++i) 	{ 		Connection* p = new Connection(); 		p->connect(_ip, _port, _username, _password, _dbname); 		p->refreshAliveTime();// 刷新一下开始空闲起始时间; 		_connectionQue.push(p); 		_connectionCnt++; 	}  	//需要启动一个新线程,作为连接的生产者(生产者线程)  	//c++的线程函数在linux里面底层也是pthread_creat,需要传入c接口,所以传入类方法需要绑定 	thread produce(std::bind(&ConnectionPool::produceConnectionTask, this)); 	produce.detach();  	//启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收 	thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this)); 	scanner.detach(); }  //生产者线程:运行独立的线程中,负责生产新连接,放在类内方便访问成员变量 void ConnectionPool::produceConnectionTask() { 	for (;;) 	{ 		unique_lock<mutex> lock(_queueMutex); 		while (!_connectionQue.empty()) 		{ 			cv.wait(lock); //队列不为空,此处生产线程进入等待状态,释放锁 		}  		//可以生产新连接,创建新连接 		if (_connectionCnt < _maxSize) {  			Connection* p = new Connection(); 			p->connect(_ip, _port, _username, _password, _dbname); 			p->refreshAliveTime();// 刷新一下开始空闲起始时间;		 			_connectionQue.push(p); 			_connectionCnt++; 		}  		//通知消费者线程可以消费连接了 		cv.notify_all(); 	} }  // 消费者线程:给用户连接,从连接池中获取一个可用的空闲连接 shared_ptr<Connection> ConnectionPool::getConnection() { 	unique_lock<mutex> lock(_queueMutex); 	//if (_connectionQue.empty())//空的就让生产者生产 	//{ 	//	//不可以用sleep,sleep是直接睡,而wait_for是被通知就可以马上继续走 	//	cv.wait_for(lock, chrono::milliseconds(_connectionTimeout));//毫秒,超过时间没有被唤醒的话也会出来 	//	if (_connectionQue.empty()) 	//	{ 	//		LOG("获取空闲连接超时了!!!获取连接失败"); 	//		return nullptr; 	//	} 	//}  	//上述没有考虑好,有可能等待过程中是被唤醒的,但是拿锁慢,还是被拿走了锁 	//优化一下: 	while (_connectionQue.empty()) 	{ 		if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout))) 		{ 			//是真的超时了, 并且连接池为空 			if (_connectionQue.empty()) 			{ 				LOG("获取空闲连接超时了!!!获取连接失败"); 				return nullptr; 			}			 		} 	}  	//有连接在池子里 	/* 	shared_ptr智能指针析构时,默认会调用connection析构函数,connection就会被close 	这里就需要自定义share_ptr的释放资源方式:把connection直接归还到_connectionQue中; 	*/ 	shared_ptr<Connection> sp(_connectionQue.front(), 		[&](Connection* pcon) { 			//这里是在服务器(多线程)消费者线程中调用的,涉及了共享数据,所以一定要考虑队列的线程安全操作 			unique_lock<mutex> lock(_queueMutex); 			pcon->refreshAliveTime();// 刷新一下开始空闲起始时间; 			_connectionQue.push(pcon); 		}); 	_connectionQue.pop(); 	//if (_connectionQue.empty()) //这样写也可以 	//{ 	//	cv.notify_all();//消费完连接后发现队列为空,通知生产者线程; 	//} 	cv.notify_all();//消费完连接后,通知生产者线程检查线程池是否为空; 	return sp; }  //回收线程函数:扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收 void ConnectionPool::scannerConnectionTask() { 	for (;;) 	{ 		//通过sleep模拟定时效果 		this_thread::sleep_for(chrono::seconds(_maxIdleTime)); 	 		// 扫描整个队列,释放多余连接 		unique_lock<mutex> lock(_queueMutex); 		while (_connectionCnt > _initSize) 		{ 			Connection* p = _connectionQue.front(); 			//这里都释放?应该释放大于initSize以上的? 			if (p->getAliveTime() > _maxIdleTime * 1000) //##队头的空闲时间是最长的,只用看队头就行 			{ 				_connectionQue.pop(); 				_connectionCnt--; 				delete p;//调用connectin析构函数 			} 			else 			{ 				break;//队头都小于,后面肯定小; 			} 		} 	} } 

五、代码测试

进行压力测试对比一下使用连接池和不使用连接池的效果;

测试代码:main函数手动测试

#include <iostream> #include <thread> #include "Connection.h" #include "MySQLConnectionPool.h"   using namespace std;   int main() { 	/* 	 * 数据库测试 	*/ 	//Connection conn; 	//char sql[1024] = { 0 }; 	//sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//conn.connect("127.0.0.1", 3306, "root", "root", "chat"); 	//conn.update(sql);  	//压力测试: 	//  	//①不用连接池,单线程,更改数据1000、5000、10000 	clock_t begin = clock(); 	for (int i = 0; i < 1000; ++i) { 		Connection conn; 		char sql[1024] = { 0 }; 		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 		conn.connect("127.0.0.1", 3306, "root", "root", "chat"); 		conn.update(sql); 	} 	clock_t end = clock(); 	cout << end - begin << "ms" << endl;  	//②用连接池单线程,更改数据1000、5000、10000 	//clock_t begin = clock(); 	//ConnectionPool *cp = ConnectionPool::getConnectionPool(); 	//for (int i = 0; i < 5000; ++i) { 	//	shared_ptr<Connection> sp= cp->getConnection(); 	//	char sql[1024] = { 0 }; 	//	sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//	sp->update(sql); 	//} 	//clock_t end = clock(); 	//cout << end - begin << "ms" << endl;  	//③不用连接池的4线程 	//不能在多线程中同时连接数据库,是非法的,需要先在外面声明连接 	//Connection conn; 	//conn.connect("127.0.0.1", 3306, "root", "root", "chat"); 	//clock_t begin = clock(); 	//thread t1([]() { 	//	for (int i = 0; i < 250; ++i) { 	//		Connection conn; 	//		char sql[1024] = { 0 }; 	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat"); 	//		conn.update(sql); 	//	} 	//	}); 	//thread t2([]() { 	//	for (int i = 0; i < 250; ++i) { 	//		Connection conn; 	//		char sql[1024] = { 0 }; 	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat"); 	//		conn.update(sql); 	//	} 	//	}); 	//thread t3([]() { 	//	for (int i = 0; i < 250; ++i) { 	//		Connection conn; 	//		char sql[1024] = { 0 }; 	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat"); 	//		conn.update(sql); 	//	} 	//	}); 	//thread t4([]() { 	//	for (int i = 0; i < 250; ++i) { 	//		Connection conn; 	//		char sql[1024] = { 0 }; 	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat"); 	//		conn.update(sql); 	//	} 	//	}); 	//t1.join(); 	//t2.join(); 	//t3.join(); 	//t4.join(); 	//clock_t end = clock(); 	//cout << end - begin << "ms" << endl;  	//④用连接池的4线程 	//clock_t begin = clock(); 	//thread	t1([]() { 	//	ConnectionPool* cp = ConnectionPool::getConnectionPool(); 	//	for (int i = 0; i < 250; ++i) { 	//		shared_ptr<Connection> sp = cp->getConnection(); 	//		char sql[1024] = { 0 }; 	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//		sp->update(sql); 	//	} 	//	}); 	//thread	t2([]() { 	//	ConnectionPool* cp = ConnectionPool::getConnectionPool(); 	//	for (int i = 0; i < 250; ++i) { 	//		shared_ptr<Connection> sp = cp->getConnection(); 	//		char sql[1024] = { 0 }; 	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//		sp->update(sql); 	//	} 	//	}); 	//thread	t3([]() { 	//	ConnectionPool* cp = ConnectionPool::getConnectionPool(); 	//	for (int i = 0; i < 250; ++i) { 	//		shared_ptr<Connection> sp = cp->getConnection(); 	//		char sql[1024] = { 0 }; 	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//		sp->update(sql); 	//	} 	//	}); 	//thread	t4([]() { 	//	ConnectionPool* cp = ConnectionPool::getConnectionPool(); 	//	for (int i = 0; i < 250; ++i) { 	//		shared_ptr<Connection> sp = cp->getConnection(); 	//		char sql[1024] = { 0 }; 	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male"); 	//		sp->update(sql); 	//	} 	//	}); 	//t1.join(); 	//t2.join(); 	//t3.join(); 	//t4.join(); 	//clock_t end = clock(); 	//cout << end - begin << "ms" << endl;  	return 0; } 

刚开始连接速度,插入速度极慢的原因:

是因为 mysql8.0 一些设置是默认开启的(5.7 是默认关闭的),而这些设置可能会严重影响数据库性能

执行以下优化:

最后我还是换成了5.7的版本进行测试:

数据量 未使用连接池所耗时间 使用连接池所耗时间
1000 单线程:1886ms 四线程:495ms 单线程:1078ms 四线程:406ms
5000 单线程:10032ms 四线程:2368ms 单线程:5328ms 四线程:2033ms
10000 单线程:19407ms 四线程:4579ms 单线程:10532ms四线程:4041ms

锻炼的技术点:MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信和 unique_lock、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型;

发表评论

评论已关闭。

相关文章