#include "user_app.h" namespace MIVA { ThreadPool poolInfer(6,ThreadPool::PRIORITY_HIGHEST, false); std::shared_ptr<UserApp> UserApp::CreateNew(){ return std::make_shared<UserApp>(); } UserApp::UserApp(){ } UserApp::~UserApp(){ Destroy(); } // APP初始化 int32_t UserApp::Init(std::string appName) { //设置日志 Logger::Instance().add(std::make_shared<ConsoleChannel>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); this->m_appName = appName; // 获取基础配置 if(m_ini.load(m_configSrc) != OK) { ErrorL << "No configuration file is found, please check if the configuration file exists!"; return ERR; } int ret = 0; this->Netty_ip = m_ini.getStringValue("Netty", "Netty_ip", ret); this->Netty_port = m_ini.getIntValue("Netty", "Netty_port", ret); this->Netty_idName = m_ini.getStringValue("Netty", "Netty_idName", ret); this->PIS_ip = m_ini.getStringValue("PIS", "PIS_ip", ret); this->PIS_port = m_ini.getIntValue("PIS", "PIS_port", ret); this->PIS_IdName = m_ini.getStringValue("PIS", "PIS_IdName", ret); this->sql_ip = m_ini.getStringValue("MySql", "sql_ip", ret); this->sql_port = m_ini.getIntValue("MySql", "sql_port", ret); this->user = m_ini.getStringValue("MySql", "user", ret); this->password = m_ini.getStringValue("MySql", "password", ret); this->character = m_ini.getStringValue("MySql", "character", ret); this->device_id = m_ini.getIntValue("USER", "device_id", ret); // 链接Mysql #if defined(SUPPORT_DYNAMIC_TEMPLATE) //初始化数据 SqlPool::Instance().Init(this->sql_ip,this->sql_port,"",this->user,this->password/*,character*/); #else //由于需要编译器对可变参数模板的支持,所以gcc5.0以下一般都不支持,否则编译报错 ErrorL << "your compiler does not support variable parameter templates!" << endl; return -1; #endif //defined(SUPPORT_DYNAMIC_TEMPLATE) // 初始化数据库连接池 SqlPool::Instance().setSize(3 + thread::hardware_concurrency()); #if defined(CREATEDB) vector<vector<string>> sql; // 创建数据库 SqlWriter("create database MIVA_DB;", false) << sql; // 创建表 #endif // 链接Netty后端 ConnectNetty(); // UDP绑定端口 this->m_udpClient = Socket::createSocket(); this->m_udpClient->bindUdpSock(this->PIS_port); vector<vector<string>> sqlRet; // 清洗旧数据 SqlWriter sqlUpdata("UPDATE MIVA_DB.DataSources SET Num=0"); sqlUpdata << sqlRet; // 查询需要播放的流 SqlWriter sqlSelect("SELECT Id,Uri FROM MIVA_DB.`DataSources` WHERE Play = 1 and Del = 0"); sqlSelect << sqlRet; if(!sqlRet.empty()){ for(auto &line : sqlRet){ DataSource data; data.Id = std::atoi(line[0].c_str()); data.uri = line[1]; this->DataList.push_back(data); } // 初始化Deepstream m_Infer = Inference::CreateNew(); if(m_Infer == NULL){ ErrorL << "Inference module creation failed!"; return ERR; } if(m_Infer->Init(this->DataList) != OK){ ErrorL << "Inference module initialization failed"; return ERR; } }else{ InfoL << "The stream to be played is not found, please add it in the background."; } // 监听推理广播 NoticeCenter::Instance().addListener(0,NOTICE_INFER, [&](int Source_id, int num){ this->ListenInfer(Source_id, num); }); // 监听Neety广播 NoticeCenter::Instance().addListener(0, NOTICE_NETTY, [&](const Buffer::Ptr &pBuf){ this->ListenNettyData(pBuf); }); // 监听关门广播 NoticeCenter::Instance().addListener(0, NOTICE_CLOSED,[&](){ this->ListenClosed(); }); InfoL << "System initialization is successful!"; return OK; } // 销毁对象 void UserApp::Destroy() { InfoL << "System exited successfully!"; } // 启动任务 void UserApp::StartTask() { // 挂起任务一 poolInfer.async([&](){ if(this->m_Infer != NULL) m_Infer->ReadyTask(); }); // 挂起函数回调 this->m_udpClient->setOnRead([&](const Buffer::Ptr &buf, struct sockaddr *addr , int){ this->ListenPISData(buf, addr); }); // 定时检查连接状态 this->m_timer0 = std::make_shared<Timer>(5.0f,[&](){ if(!m_tcpClient->alive()){ this->ConnectNetty(); } return true; }, nullptr); // 定时上报数据 this->m_timer1 = std::make_shared<Timer>(1.0f,[&](){ this->ReportData(); // 上报数据 return true; }, nullptr); poolInfer.start(); InfoL << "Task started successfully!"; } // 监听推理广播 void UserApp::ListenInfer(int Source_id, int num) { if(this->play == true){ this->m_timer2 = std::make_shared<Timer>(20.0f,[&](){ // 暂停任务 this->m_Infer->PauseTask(); return false; }, nullptr); this->play = false; } char ctime[80]; vector<vector<std::string>> sqlRet; getDataTime(ctime); int id = this->DataList[Source_id].Id; SqlWriter updataSql("UPDATE MIVA_DB.DataSources SET Num = '?',DataTime = '?' WHERE Id = '?';"); updataSql << num << ctime << id << sqlRet; } // 连接Netty void UserApp::ConnectNetty() { if(this->m_tcpClient == NULL) this->m_tcpClient = TCPClient::Ptr(new TCPClient()); this->m_tcpClient->startConnect(this->Netty_ip, this->Netty_port); } // 上报识别结果 void UserApp::ReportData() { // 查询数据 vector<vector<std::string>> sqlRet; PIDSClientResultMsg clentResultMsg; clentResultMsg.Train = 04; clentResultMsg.TrainLine = 01; SqlWriter sqlSelect("SELECT CarId,Grade,Num FROM MIVA_DB.`CarData`"); sqlSelect << sqlRet; if(sqlRet.empty()){ ErrorL << "Car data not found."; return; } for(auto &line : sqlRet){ ResultData resultData; resultData.Grade = atoi(line[1].c_str()); resultData.Num = atoi(line[2].c_str()); clentResultMsg.data[atoi(line[0].c_str())] = resultData; } uint8_t data[1024]; std::string json; memset(data, 0, sizeof(data)); // 序列化数据 clentResultMsg.Serialization(data); clentResultMsg.ObjectToJson(json); // 传输至PIS系统 struct sockaddr addrDst; makeAddr(&addrDst,this->PIS_ip.c_str(),this->PIS_port);//UDP数据发送地址 this->m_udpClient->send((char *)data,48,&addrDst, sizeof(struct sockaddr_in)); // 传输至Netty端 if(this->m_tcpClient->alive()){ (*m_tcpClient) << json; } } // 处理Netty的数据 void UserApp::ListenNettyData(const Buffer::Ptr &pBuf) { // 对Netty端的接口 } // 处理PIS系统的数据 void UserApp::ListenPISData(const Buffer::Ptr &buf, struct sockaddr *addr) { static int8_t num = 0; vector<vector<std::string>> sqlRet; // 接受到PIS发送的数据 PIDSServerResultMsg serverResultMag; // 反序列化 if(serverResultMag.Deserialization((uint8_t *)(buf->data()), buf->size()) == OK) { // 将收到的第一条PIS系统的数据作为空载荷 // 同步 char ctime[80]; if(num == 0){ for(int id=1; id <= 6 ; id ++){ SqlWriter updataSql("UPDATE MIVA_DB.CarData SET NoLoad = '?',dataTime = '?' WHERE CarId = '?';"); getDataTime(ctime); updataSql << serverResultMag.DynamicLoad[id] << ctime << id << sqlRet; } num++; }else { // 记录动态载荷 for(int id=1; id <= 6 ; id ++){ SqlWriter updataSql("UPDATE MIVA_DB.CarData SET DynamicLoad = '?', dataTime = '?' WHERE CarId = '?';"); getDataTime(ctime); updataSql << serverResultMag.DynamicLoad[id] << ctime << id << sqlRet; } } // 获取到关好门的信号 if((serverResultMag.DoorFlag & 0x80)) { // 广播收到关门的信号 NoticeCenter::Instance().emitEvent(NOTICE_CLOSED); } } } //赋值struct sockaddr void UserApp::makeAddr(struct sockaddr *out,const char *ip,uint16_t port){ struct sockaddr_in &servaddr = *((struct sockaddr_in *)out); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = inet_addr(ip); bzero(&(servaddr.sin_zero), sizeof servaddr.sin_zero); } //监听关门信号 void UserApp::ListenClosed() { if(this->m_Infer != NULL) m_Infer->StartTask(); this->play = true; } // 监听推理数据 void UserApp::ListenInferData() { // 监听推理结束 // 获取CarId、空载荷值、动态载荷值 std::map<int, CarData> carDatas; vector<vector<string>> sqlRet; SqlWriter sqlSelect0("SELECT CarId,NoLoad,DynamicLoad FROM MIVA_DB.`CarData`"); sqlSelect0 << sqlRet; if(sqlRet.empty()){ ErrorL << "Car data not found."; return; } for(auto &line : sqlRet){ CarData data; data.loadData.NoLoad = atoi(line[1].c_str()); data.loadData.DynamicLoad = atoi(line[2].c_str()); carDatas[atoi(line[0].c_str())] = data; } SqlWriter sqlSelect1("SELECT CarId,Uri,Num FROM MIVA_DB.`DataSources` WHERE Play = 1;"); sqlRet.clear(); sqlSelect1 << sqlRet; for (auto &line : sqlRet) { CarInferData data; data.uri = line[1]; data.num = atoi(line[2].c_str()); carDatas[atoi(line[0].c_str())].inferData.push_back(data); } // 提取所需的数据成功 // 数据计算 sqlRet.clear(); std::map<int, CarData>::iterator iter; for(iter=carDatas.begin(); iter!=carDatas.end(); iter++) { SqlWriter sqlUpdata("UPDATE MIVA_DB.CarData SET Grade = '?', Num = '?', dataTime = '?' WHERE CarId = '?';"); int sum = 0; std::list<CarInferData>::iterator it; for (it = (iter->second.inferData.begin()); it != (iter->second.inferData.end()); it++){ sum += it->num; } int result = DataCalculation(iter->second.loadData.NoLoad, iter->second.loadData.DynamicLoad, sum); int grade = GradeDetermination(result); char ctime[80]; getDataTime(ctime); // 更新数据 sqlUpdata << grade << result << ctime << iter->first << sqlRet; } } // 数据计算 uint32_t UserApp::DataCalculation(uint32_t noLoad, uint32_t dynamicLoad, int num) { uint32_t result = 0; return result; } // 拥挤度等级判定 uint8_t UserApp::GradeDetermination(int num) { // 等级1 - 4 uint8_t grade = 1; return grade; } }