TCPClient.cpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. /*
  2. * @Description: TCPClient
  3. * @Version: 1.0
  4. * @Autor: lishengyin
  5. * @Date: 2021-09-14 09:12:25
  6. * @LastEditors: lishengyin
  7. * @LastEditTime: 2022-09-28 09:50:50
  8. */
  9. #include "TCPClient.hpp"
  10. #include "NettyServerCommandEnum.h"
  11. #include "NettyServerResultMsg.h"
  12. #include "NettyClientResultMsg.h"
  13. #include "NettyClientCommandEnum.h"
  14. #include "RecLoginMsg.h"
  15. #include "NettyHttpNull.h"
  16. #include <vector>
  17. #include <condition_variable>
  18. #include <mutex>
  19. #include <functional>
  20. #include <thread>
  21. #include <queue>
  22. #include <future>
  23. #include "config.hpp"
  24. // web页面
  25. namespace gsd{
  26. /**
  27. * @description: 连接成功时触发
  28. * @param {SockException} &ex 连接info
  29. * @return {*}
  30. * @author: lishengyin
  31. */
  32. void TCPClient::onConnect(const SockException &ex){
  33. //连接结果事件
  34. InfoL << (ex ? ex.what() : "success, local_ip:") << get_local_ip();
  35. config::getPtr()->localIP = get_local_ip();
  36. NoticeCenter::Instance().emitEvent(NOTICE_CONNECTNETTY);
  37. }
  38. /**
  39. * @description: 接收数据时触发
  40. * @param {Ptr} &pBuf 接受到的数据
  41. * @return {*}
  42. * @author: lishengyin
  43. */
  44. void TCPClient::onRecv(const Buffer::Ptr &pBuf){
  45. //接收数据事件
  46. std::string json = pBuf->toString();
  47. NettyServerResultMsg<NettyHttpNull> msg;
  48. if(msg.jsonToObject(json)){
  49. auto iter = requestTasks.find(msg.getRequestId());
  50. if(iter != requestTasks.end()){
  51. int status = requestAck(json);
  52. auto task = std::make_shared<std::packaged_task<void()>>(std::bind(std::forward<function<void(int, std::string)>>(iter->second.task), std::forward<int>(status), std::forward<std::string>(json)));
  53. std::shared_ptr<thread> th = std::make_shared<thread>([task](){
  54. (*task)();
  55. });
  56. // 标记request完成
  57. iter->second.Finish = true;
  58. th->join();
  59. return;
  60. }
  61. }
  62. // 广播数据
  63. NoticeCenter::Instance().emitEvent(NOTICE_NETTY, pBuf);
  64. }
  65. /**
  66. * @description: 发送请求
  67. * @param {string} RequestId
  68. * @param {string&} data
  69. * @param {function<void(std::string)>} t
  70. * @return {*}
  71. */
  72. void TCPClient::sendRequest(std::string RequestId,std::string CommandEnum, std::string& data,function<void(int, std::string)> t){
  73. if(CommandEnum != "heartbeat") InfoL << "requestId: " << RequestId + ", CommandEnum: " << CommandEnum << endl;
  74. RequestAckTask acktask;
  75. acktask.ticker = std::make_shared<Ticker>(); // 创建滴答定时器
  76. acktask.task = t;
  77. acktask.CommandEnum = CommandEnum;
  78. this->requestTasks[RequestId] = acktask;
  79. // 发送信息
  80. if(this->alive()) *this << data;
  81. }
  82. /**
  83. * @description: 接收请求应答
  84. * @param {*}
  85. * @return {*}
  86. */
  87. int TCPClient::requestAck(std::string& json){
  88. NettyServerResultMsg<RecLoginMsg> nettyServerResultMsg;
  89. if(!nettyServerResultMsg.jsonToObject(json)) return Forbiddem;
  90. if(nettyServerResultMsg.getData().msg == "HeartBeat") return Ok;
  91. std::string str = " requestId: " + nettyServerResultMsg.getRequestId() + ", dataType: " + nettyServerResultMsg.getDataType() + ", code: " + nettyServerResultMsg.getData().code + ", msg: " + nettyServerResultMsg.getData().msg;
  92. if(nettyServerResultMsg.getData().code == "0" || nettyServerResultMsg.getData().code == "-5"){
  93. DebugL << str << endl;
  94. string::size_type idx;
  95. idx = nettyServerResultMsg.getData().msg.find("未找到设备数据");
  96. if(idx != string::npos) return Forbiddem;
  97. return Ok;
  98. }
  99. WarnL << str << endl;
  100. return Forbiddem;
  101. }
  102. /**
  103. * @description: 发送阻塞时触发
  104. * @param {*}
  105. * @return {*}
  106. * @author: lishengyin
  107. */
  108. void TCPClient::onFlush(){
  109. //发送阻塞后,缓存清空事件
  110. }
  111. /**
  112. * @description: EOF时触发
  113. * @param {SockException} &ex 错误信息
  114. * @return {*}
  115. * @author: lishengyin
  116. */
  117. void TCPClient::onError(const SockException &err){
  118. //断开连接事件,一般是EOF
  119. _nTick = 0;
  120. ErrorL << err.what();
  121. // 广播数据
  122. NoticeCenter::Instance().emitEvent(NOTICE_DISCONNECT);
  123. }
  124. /**
  125. * @description: 请求检查
  126. * @param {*}
  127. * @return {*}
  128. * @author: lishengyin
  129. */
  130. void TCPClient::onManager(){
  131. if(this->requestTasks.empty() || !this->alive()) return;
  132. vector<std::string> keys;
  133. for(auto iter = this->requestTasks.begin(); iter != this->requestTasks.end(); iter++){
  134. if(iter->second.ticker == nullptr) continue;
  135. // 检查任务是否超时或已完成
  136. if(iter->second.ticker->elapsedTime() >= Request_timeout || iter->second.Finish){
  137. if(!iter->second.Finish) this->requestOvertimeWork(iter->first,iter->second.CommandEnum);
  138. keys.push_back(iter->first);
  139. }
  140. }
  141. for(auto iter = keys.begin(); iter != keys.end(); iter++){
  142. this->requestTasks.erase(*iter);
  143. }
  144. }
  145. /**
  146. * @description: 超时处理
  147. * @param {string} &requestId
  148. * @param {string&} CommandEnum
  149. * @return {*}
  150. */
  151. void TCPClient::requestOvertimeWork(std::string requestId, std::string CommandEnum){
  152. WarnL << "requestId: " << requestId << ", CommandEnum:" << CommandEnum << endl;
  153. int status = Request_Time_out;
  154. auto iter = this->requestTasks.find(requestId);
  155. if(iter == this->requestTasks.end()) return;
  156. std::string json = "";
  157. auto task = std::make_shared<std::packaged_task<void()>>(std::bind(std::forward<function<void(int, std::string)>>(iter->second.task), std::forward<int>(status), std::forward<std::string>(json)));
  158. // 执行回调函数
  159. (*task)();
  160. }
  161. };