user_app.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. /*
  2. * @Description: USERAPP
  3. * @Version: 1.0
  4. * @Autor: lishengyin
  5. * @Date: 2021-10-13 09:35:42
  6. * @LastEditors: lishengyin
  7. * @LastEditTime: 2022-07-20 16:07:28
  8. */
  9. #include "user_app.h"
  10. struct timespec time1 = {0, 0};
  11. struct timespec time2 = {0, 0};
  12. struct timespec time3 = {0, 0};
  13. namespace gsd_ds
  14. {
  15. /**
  16. * @description: 创建对象
  17. * @param {*}
  18. * @return {*}
  19. * @author: lishengyin
  20. */
  21. std::shared_ptr<UserApp> UserApp::CreateNew(){
  22. return std::make_shared<UserApp>();
  23. }
  24. /**
  25. * @description: UserApp()
  26. * @return {*}
  27. */
  28. UserApp::UserApp(){
  29. }
  30. /**
  31. * @description: ~UserApp()
  32. * @return {*}
  33. */
  34. UserApp::~UserApp(){
  35. }
  36. /**
  37. * @description: 初始化
  38. * @param {string} appName
  39. * @return {*}
  40. * @author: lishengyin
  41. */
  42. int32_t UserApp::Init(std::string appName)
  43. {
  44. this->m_appName = appName;
  45. Disposition::getPtr()->updateData();
  46. recorder::getPtr()->outDir = Disposition::getPtr()->getOutDir();
  47. // 链接Mysql
  48. #if defined(SUPPORT_DYNAMIC_TEMPLATE)
  49. //初始化数据
  50. SqlPool::Instance().Init(Disposition::getPtr()->getMysqlIP(),Disposition::getPtr()->getMysqlPort(),"",Disposition::getPtr()->getUser(),Disposition::getPtr()->getPassword()/*,character*/);
  51. #else
  52. //由于需要编译器对可变参数模板的支持,所以gcc5.0以下一般都不支持,否则编译报错
  53. ErrorL << "your compiler does not support variable parameter templates!" << endl;
  54. return -1;
  55. #endif //defined(SUPPORT_DYNAMIC_TEMPLATE)
  56. // 初始化数据库连接池
  57. SqlPool::Instance().setSize(3 + thread::hardware_concurrency());
  58. this->m_InferInfo = InferInfo::CreateNew();
  59. // 初始化Deepstream
  60. m_Infer = Inference::CreateNew();
  61. if(m_Infer == NULL){
  62. ErrorL << "Inference module creation failed!" << endl;
  63. return ERR;
  64. }
  65. SqlWriter sqlSelect("SELECT Id,Uri,RecognitionRange,dv_Id FROM gsdDB.`DataSources` WHERE Play = 1 and Del = 0");
  66. vector<vector<string>> sqlRet;
  67. sqlSelect << sqlRet;
  68. if(!sqlRet.empty()){
  69. int sourceId = 0;
  70. for(auto &line : sqlRet){
  71. DataSource data;
  72. data.Id = std::atoi(line[0].c_str());
  73. data.uri = line[1];
  74. data.range = line[2];
  75. data.dv_Id = std::atoi(line[3].c_str());
  76. data.Play = true;
  77. data.sourceId = sourceId;
  78. this->m_InferInfo->DataSources.push_back(data);
  79. sourceId++;
  80. }
  81. if(m_Infer->Init() != OK){
  82. ErrorL << "Inference module initialization failed";
  83. return ERR;
  84. }
  85. }else{
  86. InfoL << "The stream to be played is not found, please add it in the background.";
  87. }
  88. NoticeCenter::Instance().addListener(0,NOTICE_QUITLOOP,
  89. [&](){
  90. this->m_Infer->QuitLoop();
  91. this->stop = true;
  92. this->condition.notify_one();
  93. });
  94. // 监听推理广播
  95. NoticeCenter::Instance().addListener(0,NOTICE_INFER,
  96. [&](int Source_id, CNStreamInferData::Ptr data){
  97. this->ListenInfer(Source_id, data);
  98. });
  99. NoticeCenter::Instance().addListener(0,NOTICE_DEV,
  100. [&](int Source_id, int num){
  101. this->ExperControler(Source_id, num);
  102. });
  103. this->m_httpClient = HttpClient::CreateNew();
  104. if(this->m_httpClient == nullptr) ErrorL << "无法创建HttpClinet对象" << endl;
  105. if(this->m_httpClient->Init("admin", "0.0.0.0", 9080) != OK){
  106. ErrorL << "HttpClinet对象初始化失败" << endl;
  107. }
  108. return OK;
  109. }
  110. /**
  111. * @description: 释放数据
  112. * @param {*}
  113. * @return {*}
  114. * @author: lishengyin
  115. */
  116. void UserApp::Destroy()
  117. {
  118. this->m_Infer->Destory();
  119. InfoL << "System exited successfully!" << endl;
  120. }
  121. /**
  122. * @description: 启动任务
  123. * @param {*}
  124. * @return {*}
  125. * @author: lishengyin
  126. */
  127. void UserApp::StartTask()
  128. {
  129. // 定时监控数据变化
  130. this->m_timer1 = std::make_shared<Timer>(3.0f,[&](){
  131. // 监听数据
  132. this->MonitorData();
  133. return true;
  134. },nullptr);
  135. InfoL << "Task started successfully!" << endl;
  136. // 启动推理
  137. this->StartInfer();
  138. }
  139. /**
  140. * @description: StartInfer
  141. * @return {*}
  142. */
  143. void UserApp::StartInfer(){
  144. InfoL;
  145. while(!stop){
  146. if(m_InferInfo->DataSources.empty()){
  147. std::unique_lock<std::mutex> lock(this->m_mutex);
  148. this->condition.wait(lock, [this]{
  149. return this->stop || !m_InferInfo->DataSources.empty();
  150. });
  151. if(stop) break;
  152. }
  153. if(!m_InferInfo->DataSources.empty() && !stop) m_Infer->StartTask();
  154. if(stop) break;
  155. }
  156. }
  157. /**
  158. * 监听数据
  159. * @param {*}
  160. * @return {*}
  161. */
  162. void UserApp::MonitorData()
  163. {
  164. static int flag = 0;
  165. vector<vector<string>> sqlRet;
  166. vector<DataSource>::iterator iter;
  167. // 查询
  168. std::string sql = "SELECT Id,Uri,RecognitionRange,dv_Id FROM gsdDB.`DataSources` WHERE Del = 0 and Play = 1";
  169. SqlWriter sqlSelect(sql.c_str());
  170. sqlSelect << sqlRet;
  171. if(!sqlRet.empty()){
  172. int sourceId = 0;
  173. if(this->m_InferInfo->DataSources.empty())
  174. {
  175. for(auto &line : sqlRet){
  176. DataSource data;
  177. data.Id = std::atoi(line[0].c_str());
  178. data.uri = line[1];
  179. data.range = line[2];
  180. data.dv_Id = std::atoi(line[3].c_str());
  181. data.Play = true;
  182. data.sourceId = sourceId;
  183. this->m_InferInfo->DataSources.push_back(data);
  184. sourceId++;
  185. }
  186. if(!this->m_Infer->getAlive()){
  187. // 初始化Deepstream
  188. if(this->m_Infer->Init() != OK){
  189. ErrorL << "Inference module initialization failed" << endl;
  190. }
  191. condition.notify_one();
  192. }else{
  193. // 添加数据源
  194. for(auto& data : this->m_InferInfo->DataSources){
  195. data.source_bin = this->m_Infer->add_sources(data.sourceId, data.uri);
  196. }
  197. this->m_Infer->RestartTask();
  198. }
  199. return;
  200. }
  201. int size = this->m_InferInfo->DataSources.size();
  202. for(int i = 0; i < size; i++)
  203. {
  204. if(i >= (int)sqlRet.size()) continue;
  205. this->m_InferInfo->DataSources[i].Id = atoi(sqlRet[i][0].c_str());
  206. this->m_InferInfo->DataSources[i].range = sqlRet[i][2];
  207. this->m_InferInfo->DataSources[i].Play = true;
  208. this->m_InferInfo->DataSources[i].dv_Id = atoi(sqlRet[i][3].c_str());
  209. if(this->m_InferInfo->DataSources[i].uri != sqlRet[i][1]){
  210. this->m_Infer->ModifyUri(this->m_InferInfo->DataSources[i].source_bin, sqlRet[i][1]);
  211. flag = 1;
  212. }
  213. this->m_InferInfo->DataSources[i].uri = sqlRet[i][1];
  214. }
  215. if(size == (int)sqlRet.size()) {
  216. if(flag) this->m_Infer->RestartTask();
  217. flag = 0;
  218. return;
  219. }
  220. if(size > (int)sqlRet.size()){
  221. int num = 0;
  222. // stop
  223. for(int i = sqlRet.size(); i < size; i++){
  224. this->m_Infer->stop_release_source(i);
  225. this->m_InferInfo->DataSources[i].source_bin = NULL;
  226. num++;
  227. }
  228. for(int i = 0; i < num; i++) this->m_InferInfo->DataSources.pop_back();
  229. }else{
  230. // add
  231. for(int i = size; i < (int)sqlRet.size(); i++){
  232. DataSource data;
  233. data.Id = std::atoi(sqlRet[i][0].c_str());
  234. data.uri = sqlRet[i][1];
  235. data.range = sqlRet[i][2];
  236. data.dv_Id = std::atoi(sqlRet[i][3].c_str());
  237. data.Play = true;
  238. data.sourceId = i;
  239. data.source_bin = this->m_Infer->add_sources(data.sourceId, data.uri);
  240. this->m_InferInfo->DataSources.push_back(data);
  241. }
  242. }
  243. this->m_Infer->RestartTask();
  244. this->m_Infer->SetBatch(this->m_InferInfo->DataSources.size());
  245. }else{
  246. if(this->m_InferInfo->DataSources.empty() && !this->m_Infer->getAlive()) return;
  247. for(iter=this->m_InferInfo->DataSources.begin(); iter!=this->m_InferInfo->DataSources.end(); iter++){
  248. this->m_Infer->stop_release_source(iter->sourceId);
  249. iter->source_bin = NULL;
  250. }
  251. // 清除数据
  252. this->m_InferInfo->DataSources.clear();
  253. this->m_Infer->RestartTask();
  254. }
  255. }
  256. /**
  257. * @description: 赋值struct sockaddr
  258. * @param {sockaddr} *out
  259. * @param {char} *ip
  260. * @param {uint16_t} port
  261. * @return {*}
  262. * @author: lishengyin
  263. */
  264. void UserApp::makeAddr(struct sockaddr *out,const char *ip,uint16_t port){
  265. struct sockaddr_in &servaddr = *((struct sockaddr_in *)out);
  266. servaddr.sin_family = AF_INET;
  267. servaddr.sin_port = htons(port);
  268. servaddr.sin_addr.s_addr = inet_addr(ip);
  269. bzero(&(servaddr.sin_zero), sizeof servaddr.sin_zero);
  270. }
  271. /**
  272. * @description: 监听推理模块广播
  273. * @param {int} Source_id 数据源ID
  274. * @param {CNStreamInferData} 推理指针
  275. * @return {*}
  276. * @author: lishengyin
  277. */
  278. void UserApp::ListenInfer(int Source_id, CNStreamInferData::Ptr data){
  279. if(data == nullptr) return;
  280. std::string json;
  281. data->dv_Id = this->m_InferInfo->DataSources[Source_id].dv_Id;
  282. data->objectToJson(json);
  283. if(this->m_httpClient->Post_Bird(json) != OK){
  284. ErrorL << "请求发送失败" << endl;
  285. }
  286. }
  287. /**
  288. * @description: 驱鸟控制器
  289. * @param {int} Source_id
  290. * @param {int} num
  291. * @return {*}
  292. */
  293. void UserApp::ExperControler(int Source_id, int num){
  294. static std::shared_ptr<Ticker> ticker = nullptr;
  295. if(num > 0){
  296. if(ticker != nullptr) ticker->resetTime();
  297. if(this -> m_DevPowerState == false){
  298. // 控制驱鸟设备
  299. if(this->m_httpClient != nullptr) this->m_httpClient->ControlDev(ExpelControler().fire);
  300. // 更新状态
  301. this -> m_DevPowerState = true;
  302. }
  303. }else{
  304. if(ticker == nullptr) ticker = std::make_shared<Ticker>();
  305. int time = 10000;
  306. if((int)(ticker->elapsedTime()) > time && this->m_DevPowerState){
  307. this->m_DevPowerState = false;
  308. ticker = nullptr;
  309. // CLOSE
  310. if(this->m_httpClient != nullptr) this->m_httpClient->ControlDev(ExpelControler().close);
  311. }
  312. }
  313. }
  314. }