/* * @Description: USERAPP * @Version: 1.0 * @Autor: lishengyin * @Date: 2021-10-13 09:35:42 * @LastEditors: lishengyin * @LastEditTime: 2022-07-20 16:07:28 */ #include "user_app.h" struct timespec time1 = {0, 0}; struct timespec time2 = {0, 0}; struct timespec time3 = {0, 0}; namespace gsd_ds { /** * @description: 创建对象 * @param {*} * @return {*} * @author: lishengyin */ std::shared_ptr UserApp::CreateNew(){ return std::make_shared(); } /** * @description: UserApp() * @return {*} */ UserApp::UserApp(){ } /** * @description: ~UserApp() * @return {*} */ UserApp::~UserApp(){ } /** * @description: 初始化 * @param {string} appName * @return {*} * @author: lishengyin */ int32_t UserApp::Init(std::string appName) { this->m_appName = appName; Disposition::getPtr()->updateData(); recorder::getPtr()->outDir = Disposition::getPtr()->getOutDir(); // 链接Mysql #if defined(SUPPORT_DYNAMIC_TEMPLATE) //初始化数据 SqlPool::Instance().Init(Disposition::getPtr()->getMysqlIP(),Disposition::getPtr()->getMysqlPort(),"",Disposition::getPtr()->getUser(),Disposition::getPtr()->getPassword()/*,character*/); #else //由于需要编译器对可变参数模板的支持,所以gcc5.0以下一般都不支持,否则编译报错 ErrorL << "your compiler does not support variable parameter templates!" << endl; return -1; #endif //defined(SUPPORT_DYNAMIC_TEMPLATE) // 初始化数据库连接池 SqlPool::Instance().setSize(3 + thread::hardware_concurrency()); this->m_InferInfo = InferInfo::CreateNew(); // 初始化Deepstream m_Infer = Inference::CreateNew(); if(m_Infer == NULL){ ErrorL << "Inference module creation failed!" << endl; return ERR; } SqlWriter sqlSelect("SELECT Id,Uri,RecognitionRange,dv_Id FROM gsdDB.`DataSources` WHERE Play = 1 and Del = 0"); vector> sqlRet; sqlSelect << sqlRet; if(!sqlRet.empty()){ int sourceId = 0; for(auto &line : sqlRet){ DataSource data; data.Id = std::atoi(line[0].c_str()); data.uri = line[1]; data.range = line[2]; data.dv_Id = std::atoi(line[3].c_str()); data.Play = true; data.sourceId = sourceId; this->m_InferInfo->DataSources.push_back(data); sourceId++; } if(m_Infer->Init() != OK){ ErrorL << "Inference module initialization failed"; return ERR; } }else{ InfoL << "The stream to be played is not found, please add it in the background."; } NoticeCenter::Instance().addListener(0,NOTICE_QUITLOOP, [&](){ this->m_Infer->QuitLoop(); this->stop = true; this->condition.notify_one(); }); // 监听推理广播 NoticeCenter::Instance().addListener(0,NOTICE_INFER, [&](int Source_id, CNStreamInferData::Ptr data){ this->ListenInfer(Source_id, data); }); NoticeCenter::Instance().addListener(0,NOTICE_DEV, [&](int Source_id, int num){ this->ExperControler(Source_id, num); }); this->m_httpClient = HttpClient::CreateNew(); if(this->m_httpClient == nullptr) ErrorL << "无法创建HttpClinet对象" << endl; if(this->m_httpClient->Init("admin", "0.0.0.0", 9080) != OK){ ErrorL << "HttpClinet对象初始化失败" << endl; } return OK; } /** * @description: 释放数据 * @param {*} * @return {*} * @author: lishengyin */ void UserApp::Destroy() { this->m_Infer->Destory(); InfoL << "System exited successfully!" << endl; } /** * @description: 启动任务 * @param {*} * @return {*} * @author: lishengyin */ void UserApp::StartTask() { // 定时监控数据变化 this->m_timer1 = std::make_shared(3.0f,[&](){ // 监听数据 this->MonitorData(); return true; },nullptr); InfoL << "Task started successfully!" << endl; // 启动推理 this->StartInfer(); } /** * @description: StartInfer * @return {*} */ void UserApp::StartInfer(){ InfoL; while(!stop){ if(m_InferInfo->DataSources.empty()){ std::unique_lock lock(this->m_mutex); this->condition.wait(lock, [this]{ return this->stop || !m_InferInfo->DataSources.empty(); }); if(stop) break; } if(!m_InferInfo->DataSources.empty() && !stop) m_Infer->StartTask(); if(stop) break; } } /** * 监听数据 * @param {*} * @return {*} */ void UserApp::MonitorData() { static int flag = 0; vector> sqlRet; vector::iterator iter; // 查询 std::string sql = "SELECT Id,Uri,RecognitionRange,dv_Id FROM gsdDB.`DataSources` WHERE Del = 0 and Play = 1"; SqlWriter sqlSelect(sql.c_str()); sqlSelect << sqlRet; if(!sqlRet.empty()){ int sourceId = 0; if(this->m_InferInfo->DataSources.empty()) { for(auto &line : sqlRet){ DataSource data; data.Id = std::atoi(line[0].c_str()); data.uri = line[1]; data.range = line[2]; data.dv_Id = std::atoi(line[3].c_str()); data.Play = true; data.sourceId = sourceId; this->m_InferInfo->DataSources.push_back(data); sourceId++; } if(!this->m_Infer->getAlive()){ // 初始化Deepstream if(this->m_Infer->Init() != OK){ ErrorL << "Inference module initialization failed" << endl; } condition.notify_one(); }else{ // 添加数据源 for(auto& data : this->m_InferInfo->DataSources){ data.source_bin = this->m_Infer->add_sources(data.sourceId, data.uri); } this->m_Infer->RestartTask(); } return; } int size = this->m_InferInfo->DataSources.size(); for(int i = 0; i < size; i++) { if(i >= (int)sqlRet.size()) continue; this->m_InferInfo->DataSources[i].Id = atoi(sqlRet[i][0].c_str()); this->m_InferInfo->DataSources[i].range = sqlRet[i][2]; this->m_InferInfo->DataSources[i].Play = true; this->m_InferInfo->DataSources[i].dv_Id = atoi(sqlRet[i][3].c_str()); if(this->m_InferInfo->DataSources[i].uri != sqlRet[i][1]){ this->m_Infer->ModifyUri(this->m_InferInfo->DataSources[i].source_bin, sqlRet[i][1]); flag = 1; } this->m_InferInfo->DataSources[i].uri = sqlRet[i][1]; } if(size == (int)sqlRet.size()) { if(flag) this->m_Infer->RestartTask(); flag = 0; return; } if(size > (int)sqlRet.size()){ int num = 0; // stop for(int i = sqlRet.size(); i < size; i++){ this->m_Infer->stop_release_source(i); this->m_InferInfo->DataSources[i].source_bin = NULL; num++; } for(int i = 0; i < num; i++) this->m_InferInfo->DataSources.pop_back(); }else{ // add for(int i = size; i < (int)sqlRet.size(); i++){ DataSource data; data.Id = std::atoi(sqlRet[i][0].c_str()); data.uri = sqlRet[i][1]; data.range = sqlRet[i][2]; data.dv_Id = std::atoi(sqlRet[i][3].c_str()); data.Play = true; data.sourceId = i; data.source_bin = this->m_Infer->add_sources(data.sourceId, data.uri); this->m_InferInfo->DataSources.push_back(data); } } this->m_Infer->RestartTask(); this->m_Infer->SetBatch(this->m_InferInfo->DataSources.size()); }else{ if(this->m_InferInfo->DataSources.empty() && !this->m_Infer->getAlive()) return; for(iter=this->m_InferInfo->DataSources.begin(); iter!=this->m_InferInfo->DataSources.end(); iter++){ this->m_Infer->stop_release_source(iter->sourceId); iter->source_bin = NULL; } // 清除数据 this->m_InferInfo->DataSources.clear(); this->m_Infer->RestartTask(); } } /** * @description: 赋值struct sockaddr * @param {sockaddr} *out * @param {char} *ip * @param {uint16_t} port * @return {*} * @author: lishengyin */ void UserApp::makeAddr(struct sockaddr *out,const char *ip,uint16_t port){ struct sockaddr_in &servaddr = *((struct sockaddr_in *)out); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = inet_addr(ip); bzero(&(servaddr.sin_zero), sizeof servaddr.sin_zero); } /** * @description: 监听推理模块广播 * @param {int} Source_id 数据源ID * @param {CNStreamInferData} 推理指针 * @return {*} * @author: lishengyin */ void UserApp::ListenInfer(int Source_id, CNStreamInferData::Ptr data){ if(data == nullptr) return; std::string json; data->dv_Id = this->m_InferInfo->DataSources[Source_id].dv_Id; data->objectToJson(json); if(this->m_httpClient->Post_Bird(json) != OK){ ErrorL << "请求发送失败" << endl; } } /** * @description: 驱鸟控制器 * @param {int} Source_id * @param {int} num * @return {*} */ void UserApp::ExperControler(int Source_id, int num){ static std::shared_ptr ticker = nullptr; if(num > 0){ if(ticker != nullptr) ticker->resetTime(); if(this -> m_DevPowerState == false){ // 控制驱鸟设备 if(this->m_httpClient != nullptr) this->m_httpClient->ControlDev(ExpelControler().fire); // 更新状态 this -> m_DevPowerState = true; } }else{ if(ticker == nullptr) ticker = std::make_shared(); int time = 10000; if((int)(ticker->elapsedTime()) > time && this->m_DevPowerState){ this->m_DevPowerState = false; ticker = nullptr; // CLOSE if(this->m_httpClient != nullptr) this->m_httpClient->ControlDev(ExpelControler().close); } } } }