123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- /*
- * @Description: USERAPP
- * @Version: 1.0
- * @Autor: lishengyin
- * @Date: 2021-10-13 09:35:42
- * @LastEditors: lishengyin
- * @LastEditTime: 2022-07-20 16:07:28
- */
- #include "user_app.h"
- struct timespec time1 = {0, 0};
- struct timespec time2 = {0, 0};
- struct timespec time3 = {0, 0};
- namespace gsd_ds
- {
- /**
- * @description: 创建对象
- * @param {*}
- * @return {*}
- * @author: lishengyin
- */
- std::shared_ptr<UserApp> UserApp::CreateNew(){
- return std::make_shared<UserApp>();
- }
-
- /**
- * @description: UserApp()
- * @return {*}
- */
- UserApp::UserApp(){
-
- }
-
- /**
- * @description: ~UserApp()
- * @return {*}
- */
- UserApp::~UserApp(){
- }
- /**
- * @description: 初始化
- * @param {string} appName
- * @return {*}
- * @author: lishengyin
- */
- int32_t UserApp::Init(std::string appName)
- {
- this->m_appName = appName;
- Disposition::getPtr()->updateData();
- recorder::getPtr()->outDir = Disposition::getPtr()->getOutDir();
- // 链接Mysql
- #if defined(SUPPORT_DYNAMIC_TEMPLATE)
- //初始化数据
- SqlPool::Instance().Init(Disposition::getPtr()->getMysqlIP(),Disposition::getPtr()->getMysqlPort(),"",Disposition::getPtr()->getUser(),Disposition::getPtr()->getPassword()/*,character*/);
- #else
- //由于需要编译器对可变参数模板的支持,所以gcc5.0以下一般都不支持,否则编译报错
- ErrorL << "your compiler does not support variable parameter templates!" << endl;
- return -1;
- #endif //defined(SUPPORT_DYNAMIC_TEMPLATE)
- // 初始化数据库连接池
- SqlPool::Instance().setSize(3 + thread::hardware_concurrency());
- this->m_InferInfo = InferInfo::CreateNew();
- // 初始化Deepstream
- m_Infer = Inference::CreateNew();
- if(m_Infer == NULL){
- ErrorL << "Inference module creation failed!" << endl;
- return ERR;
- }
- SqlWriter sqlSelect("SELECT Id,Uri,RecognitionRange,dv_Id FROM gsdDB.`DataSources` WHERE Play = 1 and Del = 0");
- vector<vector<string>> sqlRet;
- sqlSelect << sqlRet;
- if(!sqlRet.empty()){
- int sourceId = 0;
- for(auto &line : sqlRet){
- DataSource data;
- data.Id = std::atoi(line[0].c_str());
- data.uri = line[1];
- data.range = line[2];
- data.dv_Id = std::atoi(line[3].c_str());
- data.Play = true;
- data.sourceId = sourceId;
- this->m_InferInfo->DataSources.push_back(data);
- sourceId++;
- }
- if(m_Infer->Init() != OK){
- ErrorL << "Inference module initialization failed";
- return ERR;
- }
- }else{
- InfoL << "The stream to be played is not found, please add it in the background.";
- }
- NoticeCenter::Instance().addListener(0,NOTICE_QUITLOOP,
- [&](){
- this->m_Infer->QuitLoop();
- this->stop = true;
- this->condition.notify_one();
- });
- // 监听推理广播
- NoticeCenter::Instance().addListener(0,NOTICE_INFER,
- [&](int Source_id, CNStreamInferData::Ptr data){
- this->ListenInfer(Source_id, data);
- });
- NoticeCenter::Instance().addListener(0,NOTICE_DEV,
- [&](int Source_id, int num){
- this->ExperControler(Source_id, num);
- });
- this->m_httpClient = HttpClient::CreateNew();
- if(this->m_httpClient == nullptr) ErrorL << "无法创建HttpClinet对象" << endl;
- if(this->m_httpClient->Init("admin", "0.0.0.0", 9080) != OK){
- ErrorL << "HttpClinet对象初始化失败" << endl;
- }
- return OK;
- }
-
- /**
- * @description: 释放数据
- * @param {*}
- * @return {*}
- * @author: lishengyin
- */
- void UserApp::Destroy()
- {
- this->m_Infer->Destory();
- InfoL << "System exited successfully!" << endl;
- }
-
- /**
- * @description: 启动任务
- * @param {*}
- * @return {*}
- * @author: lishengyin
- */
- void UserApp::StartTask()
- {
- // 定时监控数据变化
- this->m_timer1 = std::make_shared<Timer>(3.0f,[&](){
- // 监听数据
- this->MonitorData();
- return true;
- },nullptr);
- InfoL << "Task started successfully!" << endl;
-
- // 启动推理
- this->StartInfer();
- }
- /**
- * @description: StartInfer
- * @return {*}
- */
- void UserApp::StartInfer(){
- InfoL;
- while(!stop){
- if(m_InferInfo->DataSources.empty()){
- std::unique_lock<std::mutex> lock(this->m_mutex);
- this->condition.wait(lock, [this]{
- return this->stop || !m_InferInfo->DataSources.empty();
- });
- if(stop) break;
- }
- if(!m_InferInfo->DataSources.empty() && !stop) m_Infer->StartTask();
- if(stop) break;
- }
- }
- /**
- * 监听数据
- * @param {*}
- * @return {*}
- */
- void UserApp::MonitorData()
- {
- static int flag = 0;
- vector<vector<string>> sqlRet;
- vector<DataSource>::iterator iter;
- // 查询
- std::string sql = "SELECT Id,Uri,RecognitionRange,dv_Id FROM gsdDB.`DataSources` WHERE Del = 0 and Play = 1";
- SqlWriter sqlSelect(sql.c_str());
- sqlSelect << sqlRet;
- if(!sqlRet.empty()){
- int sourceId = 0;
- if(this->m_InferInfo->DataSources.empty())
- {
- for(auto &line : sqlRet){
- DataSource data;
- data.Id = std::atoi(line[0].c_str());
- data.uri = line[1];
- data.range = line[2];
- data.dv_Id = std::atoi(line[3].c_str());
- data.Play = true;
- data.sourceId = sourceId;
- this->m_InferInfo->DataSources.push_back(data);
- sourceId++;
- }
- if(!this->m_Infer->getAlive()){
- // 初始化Deepstream
- if(this->m_Infer->Init() != OK){
- ErrorL << "Inference module initialization failed" << endl;
- }
- condition.notify_one();
- }else{
- // 添加数据源
- for(auto& data : this->m_InferInfo->DataSources){
- data.source_bin = this->m_Infer->add_sources(data.sourceId, data.uri);
- }
- this->m_Infer->RestartTask();
- }
- return;
- }
- int size = this->m_InferInfo->DataSources.size();
- for(int i = 0; i < size; i++)
- {
- if(i >= (int)sqlRet.size()) continue;
- this->m_InferInfo->DataSources[i].Id = atoi(sqlRet[i][0].c_str());
- this->m_InferInfo->DataSources[i].range = sqlRet[i][2];
- this->m_InferInfo->DataSources[i].Play = true;
- this->m_InferInfo->DataSources[i].dv_Id = atoi(sqlRet[i][3].c_str());
- if(this->m_InferInfo->DataSources[i].uri != sqlRet[i][1]){
- this->m_Infer->ModifyUri(this->m_InferInfo->DataSources[i].source_bin, sqlRet[i][1]);
- flag = 1;
- }
- this->m_InferInfo->DataSources[i].uri = sqlRet[i][1];
- }
- if(size == (int)sqlRet.size()) {
- if(flag) this->m_Infer->RestartTask();
- flag = 0;
- return;
- }
- if(size > (int)sqlRet.size()){
- int num = 0;
- // stop
- for(int i = sqlRet.size(); i < size; i++){
- this->m_Infer->stop_release_source(i);
- this->m_InferInfo->DataSources[i].source_bin = NULL;
- num++;
- }
- for(int i = 0; i < num; i++) this->m_InferInfo->DataSources.pop_back();
- }else{
- // add
- for(int i = size; i < (int)sqlRet.size(); i++){
- DataSource data;
- data.Id = std::atoi(sqlRet[i][0].c_str());
- data.uri = sqlRet[i][1];
- data.range = sqlRet[i][2];
- data.dv_Id = std::atoi(sqlRet[i][3].c_str());
- data.Play = true;
- data.sourceId = i;
- data.source_bin = this->m_Infer->add_sources(data.sourceId, data.uri);
- this->m_InferInfo->DataSources.push_back(data);
- }
- }
- this->m_Infer->RestartTask();
- this->m_Infer->SetBatch(this->m_InferInfo->DataSources.size());
- }else{
- if(this->m_InferInfo->DataSources.empty() && !this->m_Infer->getAlive()) return;
- for(iter=this->m_InferInfo->DataSources.begin(); iter!=this->m_InferInfo->DataSources.end(); iter++){
- this->m_Infer->stop_release_source(iter->sourceId);
- iter->source_bin = NULL;
- }
- // 清除数据
- this->m_InferInfo->DataSources.clear();
- this->m_Infer->RestartTask();
- }
- }
- /**
- * @description: 赋值struct sockaddr
- * @param {sockaddr} *out
- * @param {char} *ip
- * @param {uint16_t} port
- * @return {*}
- * @author: lishengyin
- */
- void UserApp::makeAddr(struct sockaddr *out,const char *ip,uint16_t port){
- struct sockaddr_in &servaddr = *((struct sockaddr_in *)out);
- servaddr.sin_family = AF_INET;
- servaddr.sin_port = htons(port);
- servaddr.sin_addr.s_addr = inet_addr(ip);
- bzero(&(servaddr.sin_zero), sizeof servaddr.sin_zero);
- }
-
- /**
- * @description: 监听推理模块广播
- * @param {int} Source_id 数据源ID
- * @param {CNStreamInferData} 推理指针
- * @return {*}
- * @author: lishengyin
- */
- void UserApp::ListenInfer(int Source_id, CNStreamInferData::Ptr data){
- if(data == nullptr) return;
- std::string json;
- data->dv_Id = this->m_InferInfo->DataSources[Source_id].dv_Id;
- data->objectToJson(json);
- if(this->m_httpClient->Post_Bird(json) != OK){
- ErrorL << "请求发送失败" << endl;
- }
- }
- /**
- * @description: 驱鸟控制器
- * @param {int} Source_id
- * @param {int} num
- * @return {*}
- */
- void UserApp::ExperControler(int Source_id, int num){
- static std::shared_ptr<Ticker> ticker = nullptr;
- if(num > 0){
- if(ticker != nullptr) ticker->resetTime();
- if(this -> m_DevPowerState == false){
- // 控制驱鸟设备
- if(this->m_httpClient != nullptr) this->m_httpClient->ControlDev(ExpelControler().fire);
- // 更新状态
- this -> m_DevPowerState = true;
- }
- }else{
- if(ticker == nullptr) ticker = std::make_shared<Ticker>();
- int time = 10000;
- if((int)(ticker->elapsedTime()) > time && this->m_DevPowerState){
- this->m_DevPowerState = false;
- ticker = nullptr;
- // CLOSE
- if(this->m_httpClient != nullptr) this->m_httpClient->ControlDev(ExpelControler().close);
- }
- }
- }
- }
|