123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- #include "user_app.h"
- namespace MIVA
- {
- ThreadPool poolInfer(6,ThreadPool::PRIORITY_HIGHEST, false);
- std::shared_ptr<UserApp> UserApp::CreateNew(){
- return std::make_shared<UserApp>();
- }
- UserApp::UserApp(){
-
- }
-
- UserApp::~UserApp(){
- Destroy();
- }
- // APP初始化
- int32_t UserApp::Init(std::string appName)
- {
- //设置日志
- Logger::Instance().add(std::make_shared<ConsoleChannel>());
- Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
- this->m_appName = appName;
- // 获取基础配置
- if(m_ini.load(m_configSrc) != OK) {
- ErrorL << "No configuration file is found, please check if the configuration file exists!";
- return ERR;
- }
- int ret = 0;
- this->Netty_ip = m_ini.getStringValue("Netty", "Netty_ip", ret);
- this->Netty_port = m_ini.getIntValue("Netty", "Netty_port", ret);
- this->Netty_idName = m_ini.getStringValue("Netty", "Netty_idName", ret);
- this->PIS_ip = m_ini.getStringValue("PIS", "PIS_ip", ret);
- this->PIS_port = m_ini.getIntValue("PIS", "PIS_port", ret);
- this->PIS_IdName = m_ini.getStringValue("PIS", "PIS_IdName", ret);
- this->sql_ip = m_ini.getStringValue("MySql", "sql_ip", ret);
- this->sql_port = m_ini.getIntValue("MySql", "sql_port", ret);
- this->user = m_ini.getStringValue("MySql", "user", ret);
- this->password = m_ini.getStringValue("MySql", "password", ret);
- this->character = m_ini.getStringValue("MySql", "character", ret);
- this->device_id = m_ini.getIntValue("USER", "device_id", ret);
- // 链接Mysql
- #if defined(SUPPORT_DYNAMIC_TEMPLATE)
- //初始化数据
- SqlPool::Instance().Init(this->sql_ip,this->sql_port,"",this->user,this->password/*,character*/);
- #else
- //由于需要编译器对可变参数模板的支持,所以gcc5.0以下一般都不支持,否则编译报错
- ErrorL << "your compiler does not support variable parameter templates!" << endl;
- return -1;
- #endif //defined(SUPPORT_DYNAMIC_TEMPLATE)
- // 初始化数据库连接池
- SqlPool::Instance().setSize(3 + thread::hardware_concurrency());
- #if defined(CREATEDB)
- vector<vector<string>> sql;
- // 创建数据库
- SqlWriter("create database MIVA_DB;", false) << sql;
- // 创建表
- #endif
- // 链接Netty后端
- ConnectNetty();
- // UDP绑定端口
- this->m_udpClient = Socket::createSocket();
- this->m_udpClient->bindUdpSock(this->PIS_port);
-
- vector<vector<string>> sqlRet;
- // 清洗旧数据
- SqlWriter sqlUpdata("UPDATE MIVA_DB.DataSources SET Num=0");
- sqlUpdata << sqlRet;
- // 查询需要播放的流
- SqlWriter sqlSelect("SELECT Id,Uri FROM MIVA_DB.`DataSources` WHERE Play = 1 and Del = 0");
- sqlSelect << sqlRet;
- if(!sqlRet.empty()){
- for(auto &line : sqlRet){
- DataSource data;
- data.Id = std::atoi(line[0].c_str());
- data.uri = line[1];
- this->DataList.push_back(data);
- }
- // 初始化Deepstream
- m_Infer = Inference::CreateNew();
- if(m_Infer == NULL){
- ErrorL << "Inference module creation failed!";
- return ERR;
- }
- if(m_Infer->Init(this->DataList) != OK){
- ErrorL << "Inference module initialization failed";
- return ERR;
- }
- }else{
- InfoL << "The stream to be played is not found, please add it in the background.";
- }
- // 监听推理广播
- NoticeCenter::Instance().addListener(0,NOTICE_INFER,
- [&](int Source_id, int num){
- this->ListenInfer(Source_id, num);
- });
- // 监听Neety广播
- NoticeCenter::Instance().addListener(0, NOTICE_NETTY,
- [&](const Buffer::Ptr &pBuf){
- this->ListenNettyData(pBuf);
- });
-
- // 监听关门广播
- NoticeCenter::Instance().addListener(0, NOTICE_CLOSED,[&](){
- this->ListenClosed();
- });
- InfoL << "System initialization is successful!";
- return OK;
- }
- // 销毁对象
- void UserApp::Destroy()
- {
- InfoL << "System exited successfully!";
- }
- // 启动任务
- void UserApp::StartTask()
- {
- // 挂起任务一
- poolInfer.async([&](){
- if(this->m_Infer != NULL) m_Infer->ReadyTask();
- });
- // 挂起函数回调
- this->m_udpClient->setOnRead([&](const Buffer::Ptr &buf, struct sockaddr *addr , int){
- this->ListenPISData(buf, addr);
- });
- // 定时检查连接状态
- this->m_timer0 = std::make_shared<Timer>(5.0f,[&](){
- if(!m_tcpClient->alive()){
- this->ConnectNetty();
- }
- return true;
- }, nullptr);
- // 定时上报数据
- this->m_timer1 = std::make_shared<Timer>(1.0f,[&](){
- this->ReportData(); // 上报数据
- return true;
- }, nullptr);
- poolInfer.start();
- InfoL << "Task started successfully!";
- }
- // 监听推理广播
- void UserApp::ListenInfer(int Source_id, int num)
- {
- if(this->play == true){
- this->m_timer2 = std::make_shared<Timer>(20.0f,[&](){
-
- // 暂停任务
- this->m_Infer->PauseTask();
- return false;
- }, nullptr);
- this->play = false;
- }
- char ctime[80];
- vector<vector<std::string>> sqlRet;
- getDataTime(ctime);
- int id = this->DataList[Source_id].Id;
- SqlWriter updataSql("UPDATE MIVA_DB.DataSources SET Num = '?',DataTime = '?' WHERE Id = '?';");
- updataSql << num << ctime << id << sqlRet;
- }
- // 连接Netty
- void UserApp::ConnectNetty()
- {
- if(this->m_tcpClient == NULL) this->m_tcpClient = TCPClient::Ptr(new TCPClient());
- this->m_tcpClient->startConnect(this->Netty_ip, this->Netty_port);
- }
- // 上报识别结果
- void UserApp::ReportData()
- {
- // 查询数据
- vector<vector<std::string>> sqlRet;
- PIDSClientResultMsg clentResultMsg;
- clentResultMsg.Train = 04;
- clentResultMsg.TrainLine = 01;
- SqlWriter sqlSelect("SELECT CarId,Grade,Num FROM MIVA_DB.`CarData`");
- sqlSelect << sqlRet;
- if(sqlRet.empty()){
- ErrorL << "Car data not found.";
- return;
- }
- for(auto &line : sqlRet){
- ResultData resultData;
- resultData.Grade = atoi(line[1].c_str());
- resultData.Num = atoi(line[2].c_str());
- clentResultMsg.data[atoi(line[0].c_str())] = resultData;
- }
- uint8_t data[1024];
- std::string json;
- memset(data, 0, sizeof(data));
- // 序列化数据
- clentResultMsg.Serialization(data);
- clentResultMsg.ObjectToJson(json);
- // 传输至PIS系统
- struct sockaddr addrDst;
- makeAddr(&addrDst,this->PIS_ip.c_str(),this->PIS_port);//UDP数据发送地址
- this->m_udpClient->send((char *)data,48,&addrDst, sizeof(struct sockaddr_in));
- // 传输至Netty端
- if(this->m_tcpClient->alive()){
- (*m_tcpClient) << json;
- }
- }
- // 处理Netty的数据
- void UserApp::ListenNettyData(const Buffer::Ptr &pBuf)
- {
- // 对Netty端的接口
- }
- // 处理PIS系统的数据
- void UserApp::ListenPISData(const Buffer::Ptr &buf, struct sockaddr *addr)
- {
- static int8_t num = 0;
- vector<vector<std::string>> sqlRet;
- // 接受到PIS发送的数据
- PIDSServerResultMsg serverResultMag;
- // 反序列化
- if(serverResultMag.Deserialization((uint8_t *)(buf->data()), buf->size()) == OK)
- {
- // 将收到的第一条PIS系统的数据作为空载荷
- // 同步
- char ctime[80];
- if(num == 0){
- for(int id=1; id <= 6 ; id ++){
- SqlWriter updataSql("UPDATE MIVA_DB.CarData SET NoLoad = '?',dataTime = '?' WHERE CarId = '?';");
- getDataTime(ctime);
- updataSql << serverResultMag.DynamicLoad[id] << ctime << id << sqlRet;
- }
- num++;
- }else
- {
- // 记录动态载荷
- for(int id=1; id <= 6 ; id ++){
- SqlWriter updataSql("UPDATE MIVA_DB.CarData SET DynamicLoad = '?', dataTime = '?' WHERE CarId = '?';");
- getDataTime(ctime);
- updataSql << serverResultMag.DynamicLoad[id] << ctime << id << sqlRet;
- }
- }
- // 获取到关好门的信号
- if((serverResultMag.DoorFlag & 0x80))
- {
- // 广播收到关门的信号
- NoticeCenter::Instance().emitEvent(NOTICE_CLOSED);
- }
- }
- }
- //赋值struct sockaddr
- void UserApp::makeAddr(struct sockaddr *out,const char *ip,uint16_t port){
- struct sockaddr_in &servaddr = *((struct sockaddr_in *)out);
- servaddr.sin_family = AF_INET;
- servaddr.sin_port = htons(port);
- servaddr.sin_addr.s_addr = inet_addr(ip);
- bzero(&(servaddr.sin_zero), sizeof servaddr.sin_zero);
- }
- //监听关门信号
- void UserApp::ListenClosed()
- {
- if(this->m_Infer != NULL) m_Infer->StartTask();
- this->play = true;
- }
- // 监听推理数据
- void UserApp::ListenInferData()
- {
- // 监听推理结束
- // 获取CarId、空载荷值、动态载荷值
- std::map<int, CarData> carDatas;
- vector<vector<string>> sqlRet;
- SqlWriter sqlSelect0("SELECT CarId,NoLoad,DynamicLoad FROM MIVA_DB.`CarData`");
- sqlSelect0 << sqlRet;
- if(sqlRet.empty()){
- ErrorL << "Car data not found.";
- return;
- }
- for(auto &line : sqlRet){
- CarData data;
- data.loadData.NoLoad = atoi(line[1].c_str());
- data.loadData.DynamicLoad = atoi(line[2].c_str());
- carDatas[atoi(line[0].c_str())] = data;
- }
- SqlWriter sqlSelect1("SELECT CarId,Uri,Num FROM MIVA_DB.`DataSources` WHERE Play = 1;");
- sqlRet.clear();
- sqlSelect1 << sqlRet;
- for (auto &line : sqlRet)
- {
- CarInferData data;
- data.uri = line[1];
- data.num = atoi(line[2].c_str());
- carDatas[atoi(line[0].c_str())].inferData.push_back(data);
- }
- // 提取所需的数据成功
- // 数据计算
- sqlRet.clear();
- std::map<int, CarData>::iterator iter;
- for(iter=carDatas.begin(); iter!=carDatas.end(); iter++)
- {
- SqlWriter sqlUpdata("UPDATE MIVA_DB.CarData SET Grade = '?', Num = '?', dataTime = '?' WHERE CarId = '?';");
- int sum = 0;
- std::list<CarInferData>::iterator it;
- for (it = (iter->second.inferData.begin()); it != (iter->second.inferData.end()); it++){
- sum += it->num;
- }
- int result = DataCalculation(iter->second.loadData.NoLoad, iter->second.loadData.DynamicLoad, sum);
- int grade = GradeDetermination(result);
- char ctime[80];
- getDataTime(ctime);
- // 更新数据
- sqlUpdata << grade << result << ctime << iter->first << sqlRet;
- }
- }
- // 数据计算
- uint32_t UserApp::DataCalculation(uint32_t noLoad, uint32_t dynamicLoad, int num)
- {
- uint32_t result = 0;
-
- return result;
- }
- // 拥挤度等级判定
- uint8_t UserApp::GradeDetermination(int num)
- {
- // 等级1 - 4
- uint8_t grade = 1;
- return grade;
- }
- }
|