基于C++11的数据库连接池实现

0.注意

该篇文章为了让大家尽快看到效果,代码放置比较靠前,看代码前务必看下第4部分的基础知识。

1.数据库连接池

1.1 是什么?

数据库连接池负责分配、管理和释放数据库连接,属于池化机制的一种,类似的还有线程池等。

1.2 为什么用?

各种池化技术的使用原因都是类似的,也就是单独操作比较浪费系统资源,利用池提前准备一些资源,在需要时可以重复使用这些预先准备的资源,从而减少系统开销,实现资源重复利用。对于数据库连接来关闭来说,需要经过四步:
(1)建立通信连接的 TCP 三次握手
(2)数据库服务器的连接认证
(3)数据库服务器关闭连接时的资源回收
(4)断开通信连接的 TCP 四次挥手
而利用数据库连接池则减少了这几步的系统开销,更加的高效。

1.3如何设计?

原理类似于线性池,在数据库连接池中提前创建好多个数据库连接,使用时从数据库连接池中取出,使用完放回数据库连接池。数据库连接池中的数据库连接调度由数据库连接池调度。

2.基于C++11的实现

Talk is cheap. Show me the code.

直接看程序,原理、函数在后面再介绍。

2.1程序

(0)全部代码:
下载之一即可
百度网盘链接:https://pan.baidu.com/s/1wvcLn0CgZxbDpYdapDVUow?pwd=bnd9 提取码:bnd9
阿里云盘连接:https://www.aliyundrive.com/s/Emsy9UJLxiv 提取码: h46t
夸克网盘链接:https://pan.quark.cn/s/58eb69a3f0fb 提取码:iRiw
(1)MysqlConn.h

#pragma once #include <iostream> #include <mysql.h> #include <chrono> using namespace std; using namespace chrono; class MysqlConn { public:     // 初始化数据库连接     MysqlConn();     // 释放数据库连接     ~MysqlConn();     // 连接数据库     bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);     // 更新数据库: insert, update, delete     bool update(string sql);     // 查询数据库     bool query(string sql);     // 遍历查询得到的结果集     bool next();     // 得到结果集中指定位置的字段值     string value(int index);     // 事务操作(提交方式)     bool transaction();     // 提交事务     bool commit();     // 事务回滚      bool rollback();     // 刷新起始的空闲时间点     void refreshAliveTime();     // 计算连接存活的总时长     long long getAliveTime(); private:     void freeResult();//释放m_result空间     MYSQL* m_conn = nullptr;     MYSQL_RES* m_result = nullptr;     MYSQL_ROW m_row = nullptr;     steady_clock::time_point m_alivetime;//当前时间点 }; 

(2)MysqlConn.cpp

#include "MysqlConn.h"  MysqlConn::MysqlConn() {     m_conn = mysql_init(nullptr);//初始化mysql     mysql_set_character_set(m_conn, "utf8");//设置编码格式维utf8 }  MysqlConn::~MysqlConn() {     if (m_conn != nullptr)     {         mysql_close(m_conn);     }     freeResult(); }  bool MysqlConn::connect(string user, string passwd, string dbName, string ip, unsigned short port) {     MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);     return ptr != nullptr; }  bool MysqlConn::update(string sql) {     if (mysql_query(m_conn, sql.c_str()))     {         return false;     }     return true; }  bool MysqlConn::query(string sql) {     freeResult();     if (mysql_query(m_conn, sql.c_str()))     {         return false;     }     m_result = mysql_store_result(m_conn);     return true; }  bool MysqlConn::next() {     if (m_result != nullptr)     {         m_row = mysql_fetch_row(m_result); //检索结果集的下一行,如果没有要检索的行,mysql_fetch_row()返回NULL         if (m_row != nullptr)         {             return true;         }     }     return false; }  string MysqlConn::value(int index) {     int rowCount = mysql_num_fields(m_result);     if (index >= rowCount || index < 0)     {         return string();     }     char* val = m_row[index];     unsigned long length = mysql_fetch_lengths(m_result)[index];//为了避免下一步在“/0”处被截断     return string(val, length); }  bool MysqlConn::transaction() {     return mysql_autocommit(m_conn, false);//事务提交方式改为手动提交 }  bool MysqlConn::commit() {     return mysql_commit(m_conn); }  bool MysqlConn::rollback() {     return mysql_rollback(m_conn); }  void MysqlConn::refreshAliveTime() {     m_alivetime = steady_clock::now(); }  long long MysqlConn::getAliveTime() {     nanoseconds res = steady_clock::now() - m_alivetime;     milliseconds millsec = duration_cast<milliseconds>(res);     return millsec.count(); }  void MysqlConn::freeResult() {     if (m_result)     {         mysql_free_result(m_result);         m_result = nullptr;     } } 

(3)ConnectionPool.h

#pragma once #include <queue> #include <mutex> #include <condition_variable> #include "MysqlConn.h" using namespace std; class ConnectionPool { public:     //单例模式     static ConnectionPool* getConnectPool();     ConnectionPool(const ConnectionPool& obj) = delete;     ConnectionPool& operator=(const ConnectionPool& obj) = delete;     ~ConnectionPool();      shared_ptr<MysqlConn> getConnection();//任务从连接池中获取一个连接      private:     ConnectionPool();//单例模式      bool parseJsonFile();//解析Json配置文件     void produceConnection();//生产新的连接     void recycleConnection();//回收多余连接     void addConnection();//添加单个连接      //MysqlConn::connect所需要的参数     string m_ip;     string m_user;     string m_passwd;     string m_dbName;     unsigned short m_port;      //连接池的参数     int m_minSize;//最小连接数     int m_maxSize;//最大连接数     int m_timeout;//超时等待时间     int m_maxIdleTime;//待回收线程的超时时间      queue<MysqlConn*> m_connectionQ;//任务队列     mutex m_mutexQ;//互斥锁     condition_variable m_cond;//条件变量 }; 

(4)ConnectionPool.cpp

#include "ConnectionPool.h" #include <json/json.h> #include <fstream> #include <thread> using namespace Json; ConnectionPool* ConnectionPool::getConnectPool() {     static ConnectionPool pool;     return &pool; }  bool ConnectionPool::parseJsonFile() {     ifstream ifs("dbconf.json");     Reader rd;     Value root;     rd.parse(ifs, root);     if (root.isObject())     {         m_ip = root["ip"].asString();         m_port = root["port"].asInt();         m_user = root["userName"].asString();         m_passwd = root["password"].asString();         m_dbName = root["dbName"].asString();         m_minSize = root["minSize"].asInt();         m_maxSize = root["maxSize"].asInt();         m_maxIdleTime = root["maxIdleTime"].asInt();         m_timeout = root["timeout"].asInt();         return true;     }     return false; }  void ConnectionPool::produceConnection() {     while (true)     {         unique_lock<mutex> locker(m_mutexQ);         while (m_connectionQ.size() >= m_minSize)         {             m_cond.wait(locker);         }         addConnection();         m_cond.notify_all();     } }  void ConnectionPool::recycleConnection() {     while (true)     {         this_thread::sleep_for(chrono::milliseconds(500));         lock_guard<mutex> locker(m_mutexQ);         while (m_connectionQ.size() > m_minSize)         {             MysqlConn* conn = m_connectionQ.front();             if (conn->getAliveTime() >= m_maxIdleTime)             {                 m_connectionQ.pop();                 delete conn;             }             else             {                 break;             }         }     } }  void ConnectionPool::addConnection() {     MysqlConn* conn = new MysqlConn;     conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);     conn->refreshAliveTime();     m_connectionQ.push(conn); }  shared_ptr<MysqlConn> ConnectionPool::getConnection() {     unique_lock<mutex> locker(m_mutexQ);     while (m_connectionQ.empty())     {         if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout)))         {             if (m_connectionQ.empty())             {                 //return nullptr;                 continue;             }         }     }     shared_ptr<MysqlConn> connptr(m_connectionQ.front(), [this](MysqlConn* conn) {         lock_guard<mutex> locker(m_mutexQ);         conn->refreshAliveTime();         m_connectionQ.push(conn);         });//自定义shared_ptr的析构方法     m_connectionQ.pop();     m_cond.notify_all();     return connptr; }  ConnectionPool::~ConnectionPool() {     while (!m_connectionQ.empty())     {         MysqlConn* conn = m_connectionQ.front();         m_connectionQ.pop();         delete conn;     } }  ConnectionPool::ConnectionPool() {     // 加载配置文件     if (!parseJsonFile())     {         return;     }      for (int i = 0; i < m_minSize; ++i)     {         addConnection();     }     thread producer(&ConnectionPool::produceConnection, this);     thread recycler(&ConnectionPool::recycleConnection, this);     producer.detach();     recycler.detach(); } 

2.2 测试代码main.cpp

#include <iostream> #include <memory> #include "MysqlConn.h" #include "ConnectionPool.h" using namespace std; // 1. 单线程: 使用/不使用连接池 // 2. 多线程: 使用/不使用连接池  void op1(int begin, int end) {     for (int i = begin; i < end; ++i)     {         MysqlConn conn;         conn.connect("root", "123159", "testdb", "192.168.237.131");         char sql[1024] = { 0 };         sprintf(sql, "insert into person values(%d, 25, 'man', 'tom')", i);         conn.update(sql);     } }  void op2(ConnectionPool* pool, int begin, int end) {     for (int i = begin; i < end; ++i)     {         shared_ptr<MysqlConn> conn = pool->getConnection();         char sql[1024] = { 0 };         sprintf(sql, "insert into person values(%d, 25, 'man', 'tom')", i);         conn->update(sql);     } }  void test1() { #if 1     // 非连接池, 单线程, 用时: 21037278300 纳秒, 21037 毫秒     steady_clock::time_point begin = steady_clock::now();     op1(0, 5000);     steady_clock::time_point end = steady_clock::now();     auto length = end - begin;     cout << "非连接池, 单线程, 用时: " << length.count() << " 纳秒, "         << length.count() / 1000000 << " 毫秒" << endl; #else     // 连接池, 单线程, 用时: 8838406500 纳秒, 8838 毫秒     ConnectionPool* pool = ConnectionPool::getConnectPool();     steady_clock::time_point begin = steady_clock::now();     op2(pool, 0, 5000);     steady_clock::time_point end = steady_clock::now();     auto length = end - begin;     cout << "连接池, 单线程, 用时: " << length.count() << " 纳秒, "         << length.count() / 1000000 << " 毫秒" << endl;  #endif }  void test2() { #if 0     // 非连接池, 多单线程, 用时: 13277417000 纳秒, 13277 毫秒     MysqlConn conn;     conn.connect("root", "root", "testdb", "192.168.237.131");     steady_clock::time_point begin = steady_clock::now();     thread t1(op1, 0, 1000);     thread t2(op1, 1000, 2000);     thread t3(op1, 2000, 3000);     thread t4(op1, 3000, 4000);     thread t5(op1, 4000, 5000);     t1.join();     t2.join();     t3.join();     t4.join();     t5.join();     steady_clock::time_point end = steady_clock::now();     auto length = end - begin;     cout << "非连接池, 多单线程, 用时: " << length.count() << " 纳秒, "         << length.count() / 1000000 << " 毫秒" << endl;  #else     // 连接池, 多单线程, 用时: 3938502100 纳秒, 3938 毫秒     ConnectionPool* pool = ConnectionPool::getConnectPool();     steady_clock::time_point begin = steady_clock::now();     thread t1(op2, pool, 0, 1000);     thread t2(op2, pool, 1000, 2000);     thread t3(op2, pool, 2000, 3000);     thread t4(op2, pool, 3000, 4000);     thread t5(op2, pool, 4000, 5000);     t1.join();     t2.join();     t3.join();     t4.join();     t5.join();     steady_clock::time_point end = steady_clock::now();     auto length = end - begin;     cout << "连接池, 多单线程, 用时: " << length.count() << " 纳秒, "         << length.count() / 1000000 << " 毫秒" << endl;  #endif }  int query() {     MysqlConn conn;     bool tt = conn.connect("root", "123159", "testdb", "127.0.0.1");     cout << "tt:  " << tt << endl;     string sql = "insert into person values(7, 25, 'man', 'tom')";     bool flag = conn.update(sql);     cout << "flag value:  " << flag << endl;      sql = "select * from person";     conn.query(sql);     while (conn.next())     {         cout << conn.value(0) << ", "             << conn.value(1) << ", "             << conn.value(2) << ", "             << conn.value(3) << endl;     }     return 0; } int main() {     test2();     return 0; } 

3.配置

需要配置jsoncpp和mysql
(1)配置jasoncpp

(2)配置mysql

4.基础知识

4.1MySQL在C语言中的API

参考:https://www.mysqlzh.com/doc/196/115.html

4.2 jsoncpp的基本知识

(1)基本函数

(2)解析Json格式数据

5.参考

https://www.bilibili.com/video/BV1Fr4y1s7w4
https://subingwen.cn/cpp/dbconnectionPool/

发表评论

相关文章