SqlPool.h 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. /*
  2. * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef SQL_SQLPOOL_H_
  11. #define SQL_SQLPOOL_H_
  12. #include <deque>
  13. #include <mutex>
  14. #include <memory>
  15. #include <sstream>
  16. #include <functional>
  17. #include "Poller/Timer.h"
  18. #include "logger.h"
  19. #include "SqlConnection.h"
  20. #include "Thread/WorkThreadPool.h"
  21. #include "Util/ResourcePool.h"
  22. using namespace std;
  23. namespace toolkit {
  24. class SqlPool : public std::enable_shared_from_this<SqlPool> {
  25. public:
  26. typedef std::shared_ptr<SqlPool> Ptr;
  27. typedef ResourcePool<SqlConnection> PoolType;
  28. typedef vector<vector<string> > SqlRetType;
  29. static SqlPool &Instance();
  30. ~SqlPool() {
  31. _timer.reset();
  32. flushError();
  33. _threadPool.reset();
  34. _pool.reset();
  35. InfoL;
  36. }
  37. /**
  38. * 设置循环池对象个数
  39. * @param size
  40. */
  41. void setSize(int size) {
  42. checkInited();
  43. _pool->setSize(size);
  44. }
  45. /**
  46. * 初始化循环池,设置数据库连接参数
  47. * @tparam Args
  48. * @param arg
  49. */
  50. template<typename ...Args>
  51. void Init(Args && ...arg) {
  52. _pool.reset(new PoolType(std::forward<Args>(arg)...));
  53. _pool->obtain();
  54. }
  55. /**
  56. * 异步执行sql
  57. * @param str sql语句
  58. * @param tryCnt 重试次数
  59. */
  60. template<typename ...Args>
  61. void asyncQuery(Args &&...args) {
  62. asyncQuery_l(SqlConnection::queryString(std::forward<Args>(args)...));
  63. }
  64. /**
  65. * 同步执行sql
  66. * @tparam Args 可变参数类型列表
  67. * @param arg 可变参数列表
  68. * @return 影响行数
  69. */
  70. template<typename ...Args>
  71. int64_t syncQuery(Args &&...arg) {
  72. checkInited();
  73. typename PoolType::ValuePtr mysql;
  74. try {
  75. //捕获执行异常
  76. mysql = _pool->obtain();
  77. return mysql->query(std::forward<Args>(arg)...);
  78. } catch (exception &e) {
  79. mysql.quit();
  80. throw;
  81. }
  82. }
  83. /**
  84. * sql转义
  85. * @param str
  86. * @return
  87. */
  88. string escape(const string &str) {
  89. checkInited();
  90. return _pool->obtain()->escape(const_cast<string &>(str));
  91. }
  92. private:
  93. SqlPool() {
  94. _threadPool = WorkThreadPool::Instance().getExecutor();
  95. _timer = std::make_shared<Timer>(30,[this](){
  96. flushError();
  97. return true;
  98. }, nullptr);
  99. }
  100. /**
  101. * 异步执行sql
  102. * @param sql sql语句
  103. * @param tryCnt 重试次数
  104. */
  105. void asyncQuery_l(const string &sql,int tryCnt = 3) {
  106. auto lam = [this,sql,tryCnt]() {
  107. int64_t rowID;
  108. auto cnt = tryCnt - 1;
  109. try {
  110. syncQuery(rowID,sql);
  111. }catch(exception &ex) {
  112. if( cnt > 0) {
  113. //失败重试
  114. lock_guard<mutex> lk(_error_query_mutex);
  115. sqlQuery query(sql,cnt);
  116. _error_query.push_back(query);
  117. }else{
  118. WarnL << ex.what();
  119. }
  120. }
  121. };
  122. _threadPool->async(lam);
  123. }
  124. /**
  125. * 定时重试失败的sql
  126. */
  127. void flushError() {
  128. decltype(_error_query) query_copy;
  129. {
  130. lock_guard<mutex> lck(_error_query_mutex);
  131. query_copy.swap(_error_query);
  132. }
  133. for (auto &query : query_copy) {
  134. asyncQuery(query.sql_str,query.tryCnt);
  135. }
  136. }
  137. /**
  138. * 检查数据库连接池是否初始化
  139. */
  140. void checkInited(){
  141. if(!_pool){
  142. throw SqlException("SqlPool::checkInited","数据库连接池未初始化");
  143. }
  144. }
  145. private:
  146. struct sqlQuery {
  147. sqlQuery(const string &sql,int cnt):sql_str(sql),tryCnt(cnt){}
  148. string sql_str;
  149. int tryCnt = 0;
  150. } ;
  151. private:
  152. deque<sqlQuery> _error_query;
  153. TaskExecutor::Ptr _threadPool;
  154. mutex _error_query_mutex;
  155. std::shared_ptr<PoolType> _pool;
  156. Timer::Ptr _timer;
  157. };
  158. /**
  159. * Sql语句生成器,通过占位符'?'的方式生成sql语句
  160. */
  161. class SqlStream {
  162. public:
  163. SqlStream(const char *sql) : _sql(sql){}
  164. ~SqlStream() {}
  165. template<typename T>
  166. SqlStream& operator <<(T &&data) {
  167. auto pos = _sql.find('?', _startPos);
  168. if (pos == string::npos) {
  169. return *this;
  170. }
  171. _str_tmp.str("");
  172. _str_tmp << std::forward<T>(data);
  173. string str = SqlPool::Instance().escape(_str_tmp.str());
  174. _startPos = pos + str.size();
  175. _sql.replace(pos, 1, str);
  176. return *this;
  177. }
  178. const string& operator <<(std::ostream&(*f)(std::ostream&)) const {
  179. return _sql;
  180. }
  181. operator string (){
  182. return _sql;
  183. }
  184. private:
  185. stringstream _str_tmp;
  186. string _sql;
  187. string::size_type _startPos = 0;
  188. };
  189. /**
  190. * sql查询器
  191. */
  192. class SqlWriter {
  193. public:
  194. /**
  195. * 构造函数
  196. * @param sql 带'?'占位符的sql模板
  197. * @param throwAble 是否抛异常
  198. */
  199. SqlWriter(const char *sql,bool throwAble = true) : _sqlstream(sql),_throwAble(throwAble) {}
  200. ~SqlWriter() {}
  201. /**
  202. * 输入参数替换占位符'?'以便生成sql语句;可能抛异常
  203. * @tparam T 参数类型
  204. * @param data 参数
  205. * @return 本身引用
  206. */
  207. template<typename T>
  208. SqlWriter& operator <<(T &&data) {
  209. try {
  210. _sqlstream << std::forward<T>(data);
  211. }catch (std::exception &ex){
  212. //在转义sql时可能抛异常
  213. if(!_throwAble){
  214. WarnL << ex.what();
  215. }else{
  216. throw;
  217. }
  218. }
  219. return *this;
  220. }
  221. /**
  222. * 异步执行sql,不会抛异常
  223. * @param f std::endl
  224. */
  225. void operator <<(std::ostream&(*f)(std::ostream&)) {
  226. //异步执行sql不会抛异常
  227. SqlPool::Instance().asyncQuery((string)_sqlstream);
  228. }
  229. /**
  230. * 同步执行sql,可能抛异常
  231. * @tparam Row 数据行类型,可以是vector<string>/list<string>等支持 obj.emplace_back("value")操作的数据类型
  232. * 也可以是map<string,string>/Json::Value 等支持 obj["key"] = "value"操作的数据类型
  233. * @param ret 数据存放对象
  234. * @return 影响行数
  235. */
  236. template <typename Row>
  237. int64_t operator <<(vector<Row> &ret) {
  238. try {
  239. _affectedRows = SqlPool::Instance().syncQuery(_rowId,ret, (string)_sqlstream);
  240. }catch (std::exception &ex){
  241. if(!_throwAble){
  242. WarnL << ex.what();
  243. } else {
  244. throw;
  245. }
  246. }
  247. return _affectedRows;
  248. }
  249. /**
  250. * 在insert数据库时返回插入的rowid
  251. * @return
  252. */
  253. int64_t getRowID() const {
  254. return _rowId;
  255. }
  256. /**
  257. * 返回影响数据库数据行数
  258. * @return
  259. */
  260. int64_t getAffectedRows() const {
  261. return _affectedRows;
  262. }
  263. private:
  264. SqlStream _sqlstream;
  265. int64_t _rowId = -1;
  266. int64_t _affectedRows = -1;
  267. bool _throwAble = true;
  268. };
  269. } /* namespace toolkit */
  270. #endif /* SQL_SQLPOOL_H_ */