SqlPool.h 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. /*
  2. * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef SQL_SQLPOOL_H_
  11. #define SQL_SQLPOOL_H_
  12. #include <deque>
  13. #include <mutex>
  14. #include <memory>
  15. #include <sstream>
  16. #include <functional>
  17. #include "logger.h"
  18. #include "Poller/Timer.h"
  19. #include "SqlConnection.h"
  20. #include "Thread/WorkThreadPool.h"
  21. #include "ResourcePool.h"
  22. namespace toolkit {
  23. class SqlPool : public std::enable_shared_from_this<SqlPool> {
  24. public:
  25. using Ptr = std::shared_ptr<SqlPool>;
  26. using PoolType = ResourcePool<SqlConnection>;
  27. using SqlRetType = std::vector<std::vector<std::string> >;
  28. static SqlPool &Instance();
  29. ~SqlPool() {
  30. _timer.reset();
  31. flushError();
  32. _threadPool.reset();
  33. _pool.reset();
  34. InfoL;
  35. }
  36. /**
  37. * 设置循环池对象个数
  38. * @param size
  39. */
  40. void setSize(int size) {
  41. checkInited();
  42. _pool->setSize(size);
  43. }
  44. /**
  45. * 初始化循环池,设置数据库连接参数
  46. * @tparam Args
  47. * @param arg
  48. */
  49. template<typename ...Args>
  50. void Init(Args &&...arg) {
  51. _pool.reset(new PoolType(std::forward<Args>(arg)...));
  52. _pool->obtain();
  53. }
  54. /**
  55. * 异步执行sql
  56. * @param str sql语句
  57. * @param tryCnt 重试次数
  58. */
  59. template<typename ...Args>
  60. void asyncQuery(Args &&...args) {
  61. asyncQuery_l(SqlConnection::queryString(std::forward<Args>(args)...));
  62. }
  63. /**
  64. * 同步执行sql
  65. * @tparam Args 可变参数类型列表
  66. * @param arg 可变参数列表
  67. * @return 影响行数
  68. */
  69. template<typename ...Args>
  70. int64_t syncQuery(Args &&...arg) {
  71. checkInited();
  72. typename PoolType::ValuePtr mysql;
  73. try {
  74. //捕获执行异常
  75. mysql = _pool->obtain();
  76. return mysql->query(std::forward<Args>(arg)...);
  77. } catch (std::exception &e) {
  78. mysql.quit();
  79. throw;
  80. }
  81. }
  82. /**
  83. * sql转义
  84. * @param str
  85. * @return
  86. */
  87. std::string escape(const std::string &str) {
  88. checkInited();
  89. return _pool->obtain()->escape(const_cast<std::string &>(str));
  90. }
  91. private:
  92. SqlPool() {
  93. _threadPool = WorkThreadPool::Instance().getExecutor();
  94. _timer = std::make_shared<Timer>(30, [this]() {
  95. flushError();
  96. return true;
  97. }, nullptr);
  98. }
  99. /**
  100. * 异步执行sql
  101. * @param sql sql语句
  102. * @param tryCnt 重试次数
  103. */
  104. void asyncQuery_l(const std::string &sql, int tryCnt = 3) {
  105. auto lam = [this, sql, tryCnt]() {
  106. int64_t rowID;
  107. auto cnt = tryCnt - 1;
  108. try {
  109. syncQuery(rowID, sql);
  110. } catch (std::exception &ex) {
  111. if (cnt > 0) {
  112. //失败重试
  113. std::lock_guard<std::mutex> lk(_error_query_mutex);
  114. sqlQuery query(sql, cnt);
  115. _error_query.push_back(query);
  116. } else {
  117. WarnL << "SqlPool::syncQuery failed: " << ex.what();
  118. }
  119. }
  120. };
  121. _threadPool->async(lam);
  122. }
  123. /**
  124. * 定时重试失败的sql
  125. */
  126. void flushError() {
  127. decltype(_error_query) query_copy;
  128. {
  129. std::lock_guard<std::mutex> lck(_error_query_mutex);
  130. query_copy.swap(_error_query);
  131. }
  132. for (auto &query : query_copy) {
  133. asyncQuery(query.sql_str, query.tryCnt);
  134. }
  135. }
  136. /**
  137. * 检查数据库连接池是否初始化
  138. */
  139. void checkInited() {
  140. if (!_pool) {
  141. throw SqlException("SqlPool::checkInited", "Mysql connection pool not initialized");
  142. }
  143. }
  144. private:
  145. struct sqlQuery {
  146. sqlQuery(const std::string &sql, int cnt) : sql_str(sql), tryCnt(cnt) {}
  147. std::string sql_str;
  148. int tryCnt = 0;
  149. };
  150. private:
  151. std::deque<sqlQuery> _error_query;
  152. TaskExecutor::Ptr _threadPool;
  153. std::mutex _error_query_mutex;
  154. std::shared_ptr<PoolType> _pool;
  155. Timer::Ptr _timer;
  156. };
  157. /**
  158. * Sql语句生成器,通过占位符'?'的方式生成sql语句
  159. */
  160. class SqlStream {
  161. public:
  162. SqlStream(const char *sql) : _sql(sql) {}
  163. ~SqlStream() {}
  164. template<typename T>
  165. SqlStream &operator<<(T &&data) {
  166. auto pos = _sql.find('?', _startPos);
  167. if (pos == std::string::npos) {
  168. return *this;
  169. }
  170. _str_tmp.str("");
  171. _str_tmp << std::forward<T>(data);
  172. std::string str = SqlPool::Instance().escape(_str_tmp.str());
  173. _startPos = pos + str.size();
  174. _sql.replace(pos, 1, str);
  175. return *this;
  176. }
  177. const std::string &operator<<(std::ostream &(*f)(std::ostream &)) const {
  178. return _sql;
  179. }
  180. operator std::string() {
  181. return _sql;
  182. }
  183. private:
  184. std::stringstream _str_tmp;
  185. std::string _sql;
  186. std::string::size_type _startPos = 0;
  187. };
  188. /**
  189. * sql查询器
  190. */
  191. class SqlWriter {
  192. public:
  193. /**
  194. * 构造函数
  195. * @param sql 带'?'占位符的sql模板
  196. * @param throwAble 是否抛异常
  197. */
  198. SqlWriter(const char *sql, bool throwAble = true) : _sqlstream(sql), _throwAble(throwAble) {}
  199. ~SqlWriter() {}
  200. /**
  201. * 输入参数替换占位符'?'以便生成sql语句;可能抛异常
  202. * @tparam T 参数类型
  203. * @param data 参数
  204. * @return 本身引用
  205. */
  206. template<typename T>
  207. SqlWriter &operator<<(T &&data) {
  208. try {
  209. _sqlstream << std::forward<T>(data);
  210. } catch (std::exception &ex) {
  211. //在转义sql时可能抛异常
  212. if (!_throwAble) {
  213. WarnL << "Commit sql failed: " << ex.what();
  214. } else {
  215. throw;
  216. }
  217. }
  218. return *this;
  219. }
  220. /**
  221. * 异步执行sql,不会抛异常
  222. * @param f std::endl
  223. */
  224. void operator<<(std::ostream &(*f)(std::ostream &)) {
  225. //异步执行sql不会抛异常
  226. SqlPool::Instance().asyncQuery((std::string) _sqlstream);
  227. }
  228. /**
  229. * 同步执行sql,可能抛异常
  230. * @tparam Row 数据行类型,可以是vector<string>/list<string>等支持 obj.emplace_back("value")操作的数据类型
  231. * 也可以是map<string,string>/Json::Value 等支持 obj["key"] = "value"操作的数据类型
  232. * @param ret 数据存放对象
  233. * @return 影响行数
  234. */
  235. template<typename Row>
  236. int64_t operator<<(std::vector<Row> &ret) {
  237. try {
  238. _affectedRows = SqlPool::Instance().syncQuery(_rowId, ret, (std::string) _sqlstream);
  239. } catch (std::exception &ex) {
  240. if (!_throwAble) {
  241. WarnL << "SqlPool::syncQuery failed: " << ex.what();
  242. } else {
  243. throw;
  244. }
  245. }
  246. return _affectedRows;
  247. }
  248. /**
  249. * 在insert数据库时返回插入的rowid
  250. * @return
  251. */
  252. int64_t getRowID() const {
  253. return _rowId;
  254. }
  255. /**
  256. * 返回影响数据库数据行数
  257. * @return
  258. */
  259. int64_t getAffectedRows() const {
  260. return _affectedRows;
  261. }
  262. private:
  263. SqlStream _sqlstream;
  264. int64_t _rowId = -1;
  265. int64_t _affectedRows = -1;
  266. bool _throwAble = true;
  267. };
  268. } /* namespace toolkit */
  269. #endif /* SQL_SQLPOOL_H_ */