user_app.cpp 12 KB


  1. #include "user_app.h"
  2. namespace MIVA
  3. {
  4. ThreadPool poolInfer(6,ThreadPool::PRIORITY_HIGHEST, false);
  5. std::shared_ptr<UserApp> UserApp::CreateNew(){
  6. return std::make_shared<UserApp>();
  7. }
  8. UserApp::UserApp(){
  9. }
  10. UserApp::~UserApp(){
  11. Destroy();
  12. }
  13. // APP初始化
  14. int32_t UserApp::Init(std::string appName)
  15. {
  16. //设置日志
  17. Logger::Instance().add(std::make_shared<ConsoleChannel>());
  18. Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
  19. this->m_appName = appName;
  20. // 获取基础配置
  21. if(m_ini.load(m_configSrc) != OK) {
  22. ErrorL << "No configuration file is found, please check if the configuration file exists!";
  23. return ERR;
  24. }
  25. int ret = 0;
  26. this->Netty_ip = m_ini.getStringValue("Netty", "Netty_ip", ret);
  27. this->Netty_port = m_ini.getIntValue("Netty", "Netty_port", ret);
  28. this->Netty_idName = m_ini.getStringValue("Netty", "Netty_idName", ret);
  29. this->PIS_ip = m_ini.getStringValue("PIS", "PIS_ip", ret);
  30. this->PIS_port = m_ini.getIntValue("PIS", "PIS_port", ret);
  31. this->PIS_IdName = m_ini.getStringValue("PIS", "PIS_IdName", ret);
  32. this->sql_ip = m_ini.getStringValue("MySql", "sql_ip", ret);
  33. this->sql_port = m_ini.getIntValue("MySql", "sql_port", ret);
  34. this->user = m_ini.getStringValue("MySql", "user", ret);
  35. this->password = m_ini.getStringValue("MySql", "password", ret);
  36. this->character = m_ini.getStringValue("MySql", "character", ret);
  37. this->device_id = m_ini.getIntValue("USER", "device_id", ret);
  38. // 链接Mysql
  39. #if defined(SUPPORT_DYNAMIC_TEMPLATE)
  40. //初始化数据
  41. SqlPool::Instance().Init(this->sql_ip,this->sql_port,"",this->user,this->password/*,character*/);
  42. #else
  43. //由于需要编译器对可变参数模板的支持,所以gcc5.0以下一般都不支持,否则编译报错
  44. ErrorL << "your compiler does not support variable parameter templates!" << endl;
  45. return -1;
  46. #endif //defined(SUPPORT_DYNAMIC_TEMPLATE)
  47. // 初始化数据库连接池
  48. SqlPool::Instance().setSize(3 + thread::hardware_concurrency());
  49. #if defined(CREATEDB)
  50. vector<vector<string>> sql;
  51. // 创建数据库
  52. SqlWriter("create database MIVA_DB;", false) << sql;
  53. // 创建表
  54. #endif
  55. // 链接Netty后端
  56. ConnectNetty();
  57. // UDP绑定端口
  58. this->m_udpClient = Socket::createSocket();
  59. this->m_udpClient->bindUdpSock(this->PIS_port);
  60. vector<vector<string>> sqlRet;
  61. // 清洗旧数据
  62. SqlWriter sqlUpdata("UPDATE MIVA_DB.DataSources SET Num=0");
  63. sqlUpdata << sqlRet;
  64. // 查询需要播放的流
  65. SqlWriter sqlSelect("SELECT Id,Uri FROM MIVA_DB.`DataSources` WHERE Play = 1 and Del = 0");
  66. sqlSelect << sqlRet;
  67. if(!sqlRet.empty()){
  68. for(auto &line : sqlRet){
  69. DataSource data;
  70. data.Id = std::atoi(line[0].c_str());
  71. data.uri = line[1];
  72. this->DataList.push_back(data);
  73. }
  74. // 初始化Deepstream
  75. m_Infer = Inference::CreateNew();
  76. if(m_Infer == NULL){
  77. ErrorL << "Inference module creation failed!";
  78. return ERR;
  79. }
  80. if(m_Infer->Init(this->DataList) != OK){
  81. ErrorL << "Inference module initialization failed";
  82. return ERR;
  83. }
  84. }else{
  85. InfoL << "The stream to be played is not found, please add it in the background.";
  86. }
  87. // 监听推理广播
  88. NoticeCenter::Instance().addListener(0,NOTICE_INFER,
  89. [&](int Source_id, int num){
  90. this->ListenInfer(Source_id, num);
  91. });
  92. // 监听Neety广播
  93. NoticeCenter::Instance().addListener(0, NOTICE_NETTY,
  94. [&](const Buffer::Ptr &pBuf){
  95. this->ListenNettyData(pBuf);
  96. });
  97. // 监听关门广播
  98. NoticeCenter::Instance().addListener(0, NOTICE_CLOSED,[&](){
  99. this->ListenClosed();
  100. });
  101. InfoL << "System initialization is successful!";
  102. return OK;
  103. }
  104. // 销毁对象
  105. void UserApp::Destroy()
  106. {
  107. InfoL << "System exited successfully!";
  108. }
  109. // 启动任务
  110. void UserApp::StartTask()
  111. {
  112. // 挂起任务一
  113. poolInfer.async([&](){
  114. if(this->m_Infer != NULL) m_Infer->ReadyTask();
  115. });
  116. // 挂起函数回调
  117. this->m_udpClient->setOnRead([&](const Buffer::Ptr &buf, struct sockaddr *addr , int){
  118. this->ListenPISData(buf, addr);
  119. });
  120. // 定时检查连接状态
  121. this->m_timer0 = std::make_shared<Timer>(5.0f,[&](){
  122. if(!m_tcpClient->alive()){
  123. this->ConnectNetty();
  124. }
  125. return true;
  126. }, nullptr);
  127. // 定时上报数据
  128. this->m_timer1 = std::make_shared<Timer>(1.0f,[&](){
  129. this->ReportData(); // 上报数据
  130. return true;
  131. }, nullptr);
  132. poolInfer.start();
  133. InfoL << "Task started successfully!";
  134. }
  135. // 监听推理广播
  136. void UserApp::ListenInfer(int Source_id, int num)
  137. {
  138. if(this->play == true){
  139. this->m_timer2 = std::make_shared<Timer>(20.0f,[&](){
  140. // 暂停任务
  141. this->m_Infer->PauseTask();
  142. return false;
  143. }, nullptr);
  144. this->play = false;
  145. }
  146. char ctime[80];
  147. vector<vector<std::string>> sqlRet;
  148. getDataTime(ctime);
  149. int id = this->DataList[Source_id].Id;
  150. SqlWriter updataSql("UPDATE MIVA_DB.DataSources SET Num = '?',DataTime = '?' WHERE Id = '?';");
  151. updataSql << num << ctime << id << sqlRet;
  152. }
  153. // 连接Netty
  154. void UserApp::ConnectNetty()
  155. {
  156. if(this->m_tcpClient == NULL) this->m_tcpClient = TCPClient::Ptr(new TCPClient());
  157. this->m_tcpClient->startConnect(this->Netty_ip, this->Netty_port);
  158. }
  159. // 上报识别结果
  160. void UserApp::ReportData()
  161. {
  162. // 查询数据
  163. vector<vector<std::string>> sqlRet;
  164. PIDSClientResultMsg clentResultMsg;
  165. clentResultMsg.Train = 04;
  166. clentResultMsg.TrainLine = 01;
  167. SqlWriter sqlSelect("SELECT CarId,Grade,Num FROM MIVA_DB.`CarData`");
  168. sqlSelect << sqlRet;
  169. if(sqlRet.empty()){
  170. ErrorL << "Car data not found.";
  171. return;
  172. }
  173. for(auto &line : sqlRet){
  174. ResultData resultData;
  175. resultData.Grade = atoi(line[1].c_str());
  176. resultData.Num = atoi(line[2].c_str());
  177. clentResultMsg.data[atoi(line[0].c_str())] = resultData;
  178. }
  179. uint8_t data[1024];
  180. std::string json;
  181. memset(data, 0, sizeof(data));
  182. // 序列化数据
  183. clentResultMsg.Serialization(data);
  184. clentResultMsg.ObjectToJson(json);
  185. // 传输至PIS系统
  186. struct sockaddr addrDst;
  187. makeAddr(&addrDst,this->PIS_ip.c_str(),this->PIS_port);//UDP数据发送地址
  188. this->m_udpClient->send((char *)data,48,&addrDst, sizeof(struct sockaddr_in));
  189. // 传输至Netty端
  190. if(this->m_tcpClient->alive()){
  191. (*m_tcpClient) << json;
  192. }
  193. }
  194. // 处理Netty的数据
  195. void UserApp::ListenNettyData(const Buffer::Ptr &pBuf)
  196. {
  197. // 对Netty端的接口
  198. }
  199. // 处理PIS系统的数据
  200. void UserApp::ListenPISData(const Buffer::Ptr &buf, struct sockaddr *addr)
  201. {
  202. static int8_t num = 0;
  203. vector<vector<std::string>> sqlRet;
  204. // 接受到PIS发送的数据
  205. PIDSServerResultMsg serverResultMag;
  206. // 反序列化
  207. if(serverResultMag.Deserialization((uint8_t *)(buf->data()), buf->size()) == OK)
  208. {
  209. // 将收到的第一条PIS系统的数据作为空载荷
  210. // 同步
  211. char ctime[80];
  212. if(num == 0){
  213. for(int id=1; id <= 6 ; id ++){
  214. SqlWriter updataSql("UPDATE MIVA_DB.CarData SET NoLoad = '?',dataTime = '?' WHERE CarId = '?';");
  215. getDataTime(ctime);
  216. updataSql << serverResultMag.DynamicLoad[id] << ctime << id << sqlRet;
  217. }
  218. num++;
  219. }else
  220. {
  221. // 记录动态载荷
  222. for(int id=1; id <= 6 ; id ++){
  223. SqlWriter updataSql("UPDATE MIVA_DB.CarData SET DynamicLoad = '?', dataTime = '?' WHERE CarId = '?';");
  224. getDataTime(ctime);
  225. updataSql << serverResultMag.DynamicLoad[id] << ctime << id << sqlRet;
  226. }
  227. }
  228. // 获取到关好门的信号
  229. if((serverResultMag.DoorFlag & 0x80))
  230. {
  231. // 广播收到关门的信号
  232. NoticeCenter::Instance().emitEvent(NOTICE_CLOSED);
  233. }
  234. }
  235. }
  236. //赋值struct sockaddr
  237. void UserApp::makeAddr(struct sockaddr *out,const char *ip,uint16_t port){
  238. struct sockaddr_in &servaddr = *((struct sockaddr_in *)out);
  239. servaddr.sin_family = AF_INET;
  240. servaddr.sin_port = htons(port);
  241. servaddr.sin_addr.s_addr = inet_addr(ip);
  242. bzero(&(servaddr.sin_zero), sizeof servaddr.sin_zero);
  243. }
  244. //监听关门信号
  245. void UserApp::ListenClosed()
  246. {
  247. if(this->m_Infer != NULL) m_Infer->StartTask();
  248. this->play = true;
  249. }
  250. // 监听推理数据
  251. void UserApp::ListenInferData()
  252. {
  253. // 监听推理结束
  254. // 获取CarId、空载荷值、动态载荷值
  255. std::map<int, CarData> carDatas;
  256. vector<vector<string>> sqlRet;
  257. SqlWriter sqlSelect0("SELECT CarId,NoLoad,DynamicLoad FROM MIVA_DB.`CarData`");
  258. sqlSelect0 << sqlRet;
  259. if(sqlRet.empty()){
  260. ErrorL << "Car data not found.";
  261. return;
  262. }
  263. for(auto &line : sqlRet){
  264. CarData data;
  265. data.loadData.NoLoad = atoi(line[1].c_str());
  266. data.loadData.DynamicLoad = atoi(line[2].c_str());
  267. carDatas[atoi(line[0].c_str())] = data;
  268. }
  269. SqlWriter sqlSelect1("SELECT CarId,Uri,Num FROM MIVA_DB.`DataSources` WHERE Play = 1;");
  270. sqlRet.clear();
  271. sqlSelect1 << sqlRet;
  272. for (auto &line : sqlRet)
  273. {
  274. CarInferData data;
  275. data.uri = line[1];
  276. data.num = atoi(line[2].c_str());
  277. carDatas[atoi(line[0].c_str())].inferData.push_back(data);
  278. }
  279. // 提取所需的数据成功
  280. // 数据计算
  281. sqlRet.clear();
  282. std::map<int, CarData>::iterator iter;
  283. for(iter=carDatas.begin(); iter!=carDatas.end(); iter++)
  284. {
  285. SqlWriter sqlUpdata("UPDATE MIVA_DB.CarData SET Grade = '?', Num = '?', dataTime = '?' WHERE CarId = '?';");
  286. int sum = 0;
  287. std::list<CarInferData>::iterator it;
  288. for (it = (iter->second.inferData.begin()); it != (iter->second.inferData.end()); it++){
  289. sum += it->num;
  290. }
  291. int result = DataCalculation(iter->second.loadData.NoLoad, iter->second.loadData.DynamicLoad, sum);
  292. int grade = GradeDetermination(result);
  293. char ctime[80];
  294. getDataTime(ctime);
  295. // 更新数据
  296. sqlUpdata << grade << result << ctime << iter->first << sqlRet;
  297. }
  298. }
  299. // 数据计算
  300. uint32_t UserApp::DataCalculation(uint32_t noLoad, uint32_t dynamicLoad, int num)
  301. {
  302. uint32_t result = 0;
  303. return result;
  304. }
  305. // 拥挤度等级判定
  306. uint8_t UserApp::GradeDetermination(int num)
  307. {
  308. // 等级1 - 4
  309. uint8_t grade = 1;
  310. return grade;
  311. }
  312. }