123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- /*
- * @Description: TCPClient
- * @Version: 1.0
- * @Autor: lishengyin
- * @Date: 2021-09-14 09:12:25
- * @LastEditors: lishengyin
- * @LastEditTime: 2022-09-28 09:50:50
- */
- #include "TCPClient.hpp"
- #include "NettyServerCommandEnum.h"
- #include "NettyServerResultMsg.h"
- #include "NettyClientResultMsg.h"
- #include "NettyClientCommandEnum.h"
- #include "RecLoginMsg.h"
- #include "NettyHttpNull.h"
- #include <vector>
- #include <condition_variable>
- #include <mutex>
- #include <functional>
- #include <thread>
- #include <queue>
- #include <future>
- #include "config.hpp"
- // web页面
- namespace gsd{
- /**
- * @description: 连接成功时触发
- * @param {SockException} &ex 连接info
- * @return {*}
- * @author: lishengyin
- */
- void TCPClient::onConnect(const SockException &ex){
- //连接结果事件
- InfoL << (ex ? ex.what() : "success, local_ip:") << get_local_ip();
- config::getPtr()->localIP = get_local_ip();
- NoticeCenter::Instance().emitEvent(NOTICE_CONNECTNETTY);
- }
- /**
- * @description: 接收数据时触发
- * @param {Ptr} &pBuf 接受到的数据
- * @return {*}
- * @author: lishengyin
- */
- void TCPClient::onRecv(const Buffer::Ptr &pBuf){
- //接收数据事件
- std::string json = pBuf->toString();
- NettyServerResultMsg<NettyHttpNull> msg;
- if(msg.jsonToObject(json)){
- auto iter = requestTasks.find(msg.getRequestId());
- if(iter != requestTasks.end()){
- int status = requestAck(json);
- auto task = std::make_shared<std::packaged_task<void()>>(std::bind(std::forward<function<void(int, std::string)>>(iter->second.task), std::forward<int>(status), std::forward<std::string>(json)));
- std::shared_ptr<thread> th = std::make_shared<thread>([task](){
- (*task)();
- });
- // 标记request完成
- iter->second.Finish = true;
- th->join();
- return;
- }
- }
- // 广播数据
- NoticeCenter::Instance().emitEvent(NOTICE_NETTY, pBuf);
- }
- /**
- * @description: 发送请求
- * @param {string} RequestId
- * @param {string&} data
- * @param {function<void(std::string)>} t
- * @return {*}
- */
- void TCPClient::sendRequest(std::string RequestId,std::string CommandEnum, std::string& data,function<void(int, std::string)> t){
- if(CommandEnum != "heartbeat") InfoL << "requestId: " << RequestId + ", CommandEnum: " << CommandEnum << endl;
- RequestAckTask acktask;
- acktask.ticker = std::make_shared<Ticker>(); // 创建滴答定时器
- acktask.task = t;
- acktask.CommandEnum = CommandEnum;
- this->requestTasks[RequestId] = acktask;
- // 发送信息
- if(this->alive()) *this << data;
- }
- /**
- * @description: 接收请求应答
- * @param {*}
- * @return {*}
- */
- int TCPClient::requestAck(std::string& json){
- NettyServerResultMsg<RecLoginMsg> nettyServerResultMsg;
- if(!nettyServerResultMsg.jsonToObject(json)) return Forbiddem;
- if(nettyServerResultMsg.getData().msg == "HeartBeat") return Ok;
- std::string str = " requestId: " + nettyServerResultMsg.getRequestId() + ", dataType: " + nettyServerResultMsg.getDataType() + ", code: " + nettyServerResultMsg.getData().code + ", msg: " + nettyServerResultMsg.getData().msg;
- if(nettyServerResultMsg.getData().code == "0" || nettyServerResultMsg.getData().code == "-5"){
- DebugL << str << endl;
- string::size_type idx;
- idx = nettyServerResultMsg.getData().msg.find("未找到设备数据");
- if(idx != string::npos) return Forbiddem;
- return Ok;
- }
- WarnL << str << endl;
- return Forbiddem;
- }
- /**
- * @description: 发送阻塞时触发
- * @param {*}
- * @return {*}
- * @author: lishengyin
- */
- void TCPClient::onFlush(){
- //发送阻塞后,缓存清空事件
-
- }
- /**
- * @description: EOF时触发
- * @param {SockException} &ex 错误信息
- * @return {*}
- * @author: lishengyin
- */
- void TCPClient::onError(const SockException &err){
- //断开连接事件,一般是EOF
- _nTick = 0;
- ErrorL << err.what();
- // 广播数据
- NoticeCenter::Instance().emitEvent(NOTICE_DISCONNECT);
- }
- /**
- * @description: 请求检查
- * @param {*}
- * @return {*}
- * @author: lishengyin
- */
- void TCPClient::onManager(){
- if(this->requestTasks.empty() || !this->alive()) return;
- vector<std::string> keys;
- for(auto iter = this->requestTasks.begin(); iter != this->requestTasks.end(); iter++){
- if(iter->second.ticker == nullptr) continue;
- // 检查任务是否超时或已完成
- if(iter->second.ticker->elapsedTime() >= Request_timeout || iter->second.Finish){
- if(!iter->second.Finish) this->requestOvertimeWork(iter->first,iter->second.CommandEnum);
- keys.push_back(iter->first);
- }
- }
- for(auto iter = keys.begin(); iter != keys.end(); iter++){
- this->requestTasks.erase(*iter);
- }
- }
- /**
- * @description: 超时处理
- * @param {string} &requestId
- * @param {string&} CommandEnum
- * @return {*}
- */
- void TCPClient::requestOvertimeWork(std::string requestId, std::string CommandEnum){
- WarnL << "requestId: " << requestId << ", CommandEnum:" << CommandEnum << endl;
- int status = Request_Time_out;
- auto iter = this->requestTasks.find(requestId);
- if(iter == this->requestTasks.end()) return;
- std::string json = "";
- auto task = std::make_shared<std::packaged_task<void()>>(std::bind(std::forward<function<void(int, std::string)>>(iter->second.task), std::forward<int>(status), std::forward<std::string>(json)));
- // 执行回调函数
- (*task)();
- }
- };
|