main.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. /*************************************************************************
  2. * Copyright (C) [2019] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #include <gflags/gflags.h>
  21. #include <opencv2/highgui/highgui.hpp>
  22. #include <opencv2/imgproc/imgproc.hpp>
  23. #if (CV_MAJOR_VERSION >= 3)
  24. #include <opencv2/imgcodecs/imgcodecs.hpp>
  25. #endif
  26. #include <atomic>
  27. #include <condition_variable>
  28. #include <fstream>
  29. #include <iostream>
  30. #include <list>
  31. #include <memory>
  32. #include <mutex>
  33. #include <sstream>
  34. #include <string>
  35. #include <set>
  36. #include <vector>
  37. #include "cnstream_version.hpp"
  38. #include "data_source.hpp"
  39. #ifdef HAVE_DISPLAY
  40. #include "displayer.hpp"
  41. #endif
  42. #include "cnstream_logging.hpp"
  43. #include "util.hpp"
  44. #include "profiler/pipeline_profiler.hpp"
  45. #include "profiler/profile.hpp"
  46. #include "profiler/trace_serialize_helper.hpp"
  47. DEFINE_string(data_path, "", "video file list.");
  48. DEFINE_string(data_name, "", "video file name.");
  49. DEFINE_int32(src_frame_rate, 25, "frame rate for send data");
  50. DEFINE_int32(maximum_video_width, -1, "maximum video width, for variable video resolutions, "
  51. "not supported on MLU220/MLU270");
  52. DEFINE_int32(maximum_video_height, -1, "maximum video height, for variable video resolutions, "
  53. "not supported on MLU220/MLU270");
  54. DEFINE_int32(maximum_image_width, 7680, "maximum image width, valid when jpeg_from_mem is true");
  55. DEFINE_int32(maximum_image_height, 4320, "maxmum image width, valid when jpeg_from_mem is true");
  56. DEFINE_int32(wait_time, 0, "time of one test case");
  57. DEFINE_bool(loop, false, "display repeat");
  58. DEFINE_string(config_fname, "", "pipeline config filename");
  59. DEFINE_bool(jpeg_from_mem, true, "Jpeg bitstream from mem.");
  60. DEFINE_bool(raw_img_input, false, "feed decompressed image to source");
  61. DEFINE_bool(use_cv_mat, true, "feed cv mat to source. It is valid only if ``raw_img_input`` is set to true");
  62. DEFINE_string(trace_data_dir, "", "dump trace data to specified dir. An empty string means that no data is stored");
  63. #ifdef HAVE_DISPLAY
  64. cnstream::Displayer *gdisplayer = nullptr;
  65. #endif
  66. std::atomic<bool> thread_running{true};
  67. std::atomic<bool> gstop_perf_print{false};
  68. std::mutex gmutex_for_add_source;
  69. class UserLogSink : public cnstream::LogSink {
  70. public:
  71. void Send(cnstream::LogSeverity severity, const char *category, const char *filename, int line,
  72. const struct ::tm *tm_time, int32_t usecs, const char *message, size_t message_len) override {
  73. std::cout << "UserLogSink: " << ToString(severity, category, filename, line, tm_time, usecs, message, message_len)
  74. << std::endl;
  75. }
  76. };
  77. class MsgObserver : cnstream::StreamMsgObserver {
  78. public:
  79. MsgObserver(cnstream::Pipeline *pipeline, std::string source_name)
  80. : pipeline_(pipeline), source_name_(source_name) {}
  81. void Update(const cnstream::StreamMsg &smsg) override {
  82. std::lock_guard<std::mutex> add_src_lg(gmutex_for_add_source);
  83. std::lock_guard<std::mutex> lg(mutex_);
  84. if (stop_) return;
  85. cnstream::DataSource *source = nullptr;
  86. source = dynamic_cast<cnstream::DataSource *>(pipeline_->GetModule(source_name_));
  87. switch (smsg.type) {
  88. case cnstream::StreamMsgType::EOS_MSG:
  89. LOGI(DEMO) << "[" << pipeline_->GetName() << "] received EOS message from stream: [" << smsg.stream_id << "]";
  90. if (stream_set_.find(smsg.stream_id) != stream_set_.end()) {
  91. if (source) source->RemoveSource(smsg.stream_id);
  92. stream_set_.erase(smsg.stream_id);
  93. }
  94. if (stream_set_.empty()) {
  95. LOGI(DEMO) << "[" << pipeline_->GetName() << "] received all EOS";
  96. stop_ = true;
  97. }
  98. break;
  99. case cnstream::StreamMsgType::STREAM_ERR_MSG:
  100. LOGW(DEMO) << "[" << pipeline_->GetName() << "] received stream error from stream: " << smsg.stream_id
  101. << ", remove it from pipeline.";
  102. if (stream_set_.find(smsg.stream_id) != stream_set_.end()) {
  103. if (source) source->RemoveSource(smsg.stream_id, true);
  104. stream_set_.erase(smsg.stream_id);
  105. }
  106. if (stream_set_.empty()) {
  107. LOGI(DEMO) << "[" << pipeline_->GetName() << "] all streams is removed from pipeline, pipeline will stop.";
  108. stop_ = true;
  109. }
  110. break;
  111. case cnstream::StreamMsgType::ERROR_MSG:
  112. if (source) source->RemoveSources(true);
  113. stream_set_.clear();
  114. stop_ = true;
  115. break;
  116. case cnstream::StreamMsgType::FRAME_ERR_MSG:
  117. LOGW(DEMO) << "[" << pipeline_->GetName() << "] received frame error from stream: " << smsg.stream_id
  118. << ", pts: " << smsg.pts << ".";
  119. break;
  120. default:
  121. LOGE(DEMO) << "[" << pipeline_->GetName() << "] unknown message type.";
  122. break;
  123. }
  124. if (stop_) {
  125. wakener_.notify_one();
  126. }
  127. }
  128. void WaitForStop() {
  129. std::unique_lock<std::mutex> lk(mutex_);
  130. if (stream_set_.empty()) {
  131. stop_ = true;
  132. }
  133. wakener_.wait(lk, [this]() { return stop_.load(); });
  134. lk.unlock();
  135. pipeline_->Stop();
  136. }
  137. void IncreaseStream(std::string stream_id) {
  138. std::unique_lock<std::mutex> lk(mutex_);
  139. if (stream_set_.find(stream_id) != stream_set_.end()) {
  140. LOGF(DEMO) << "IncreaseStream() The stream is ongoing []" << stream_id;
  141. }
  142. stream_set_.insert(stream_id);
  143. if (stop_) stop_ = false;
  144. }
  145. private:
  146. cnstream::Pipeline *pipeline_ = nullptr;
  147. std::string source_name_;
  148. std::atomic<bool> stop_{false};
  149. std::set<std::string> stream_set_;
  150. std::condition_variable wakener_;
  151. mutable std::mutex mutex_;
  152. };
  153. int AddSourceForRtspStream(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
  154. const cnstream::MaximumVideoResolution& maximum_resolution) {
  155. auto handler = cnstream::RtspHandler::Create(source, stream_id, filename, false, 10, maximum_resolution);
  156. return source->AddSource(handler);
  157. }
  158. /**
  159. * @brief Adds source for usb camera.
  160. *
  161. * @note Supports Logitech C505e 720P USB CAMERA on Ubuntu 18.04.5.
  162. * Required steps:
  163. * 1. Compiles x264
  164. * git clone https://code.videolan.org/videolan/x264.git
  165. * cd x264
  166. * ./configure --enable-shared --prefix=/usr/local/x264 --disable-asm
  167. * make
  168. * sudo make install
  169. * 2. Compiles ffmpeg
  170. * wget http://www.ffmpeg.org/releases/ffmpeg-3.4.8.tar.xz
  171. * tar xvf ffmpeg-3.4.8.tar.xz
  172. * cd ffmpeg-3.4.8
  173. * export PKG_CONFIG_PATH=/usr/local/x264/lib/pkgconfig
  174. * ./configure \
  175. * --prefix=/usr/local/ \
  176. * --enable-shared \
  177. * --enable-static \
  178. * --enable-gpl \
  179. * --enable-nonfree \
  180. * --enable-ffmpeg \
  181. * --disable-ffplay \
  182. * --enable-swscale \
  183. * --pkg-config="pkg-config --static" \
  184. * --enable-pthreads \
  185. * --disable-armv5te \
  186. * --disable-armv6 \
  187. * --disable-armv6t2 \
  188. * --disable-yasm \
  189. * --disable-stripping \
  190. * --enable-libx264 \
  191. * --enable-libv4l2 \
  192. * --extra-cflags=-I/usr/local/x264/include \
  193. * --extra-ldflags=-L/usr/local/x264/lib
  194. * make -j4
  195. * sudo make install
  196. * 3. Modifies modules/CMakeLists.txt
  197. * turns WITH_FFMPEG_AVDEVICE into ON
  198. * 4. Compiles cnstream
  199. * 5. Runs the demo of usb camera
  200. * cd samples/cns_launcher/object_detection
  201. * export LD_LIBRARY_PATH=/usr/local/x264/lib:$LD_LIBRARY_PATH
  202. * run.sh [mlu220/mlu270] [encode_jpeg/encode_video/display/rtsp] usb
  203. *
  204. * And the support of other types of USB CAMERA and operate system is not tested.
  205. * Above steps may be a reference.
  206. */
  207. int AddSourceForUsbCam(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
  208. const int &frame_rate, const bool &loop,
  209. const cnstream::MaximumVideoResolution& maximum_resolution) {
  210. int ret = -1;
  211. auto handler = cnstream::FileHandler::Create(source, stream_id, filename, frame_rate, loop, maximum_resolution);
  212. ret = source->AddSource(handler);
  213. return ret;
  214. }
  215. std::vector<std::future<int>> gFeedMemFutures;
  216. int AddSourceForVideoInMem(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
  217. const bool &loop, const cnstream::MaximumVideoResolution& maximum_resolution) {
  218. FILE *fp = fopen(filename.c_str(), "rb");
  219. if (!fp) {
  220. LOGE(DEMO) << "Open file failed. file name : " << filename;
  221. return -1;
  222. }
  223. auto handler = cnstream::ESMemHandler::Create(source, stream_id, maximum_resolution);
  224. if (source->AddSource(handler)) {
  225. LOGE(DEMO) << "failed to add " << stream_id;
  226. return -1;
  227. }
  228. // Start another thread to read file's binary data into memory and feed it to pipeline.
  229. gFeedMemFutures.emplace_back(std::async(std::launch::async, [=]() {
  230. auto memHandler = std::dynamic_pointer_cast<cnstream::ESMemHandler>(handler);
  231. memHandler->SetDataType(cnstream::ESMemHandler::DataType::H264);
  232. unsigned char buf[4096];
  233. while (thread_running.load()) {
  234. if (!feof(fp)) {
  235. int size = fread(buf, 1, 4096, fp);
  236. if (memHandler->Write(buf, size) != 0) {
  237. break;
  238. }
  239. } else if (loop) {
  240. fseek(fp, 0, SEEK_SET);
  241. } else {
  242. break;
  243. }
  244. }
  245. if (!feof(fp)) {
  246. memHandler->WriteEos();
  247. } else {
  248. memHandler->Write(nullptr, 0);
  249. }
  250. fclose(fp);
  251. return 0;
  252. }));
  253. return 0;
  254. }
  255. int AddSourceForImageInMem(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
  256. const bool &loop) {
  257. int index = filename.find_last_of("/");
  258. std::string dir_path = filename.substr(0, index);
  259. std::list<std::string> files = GetFileNameFromDir(dir_path, "*.jpg");
  260. if (files.empty()) {
  261. LOGE(DEMO) << "there is no jpg files";
  262. return -1;
  263. }
  264. auto handler = cnstream::ESJpegMemHandler::Create(source, stream_id,
  265. FLAGS_maximum_image_width, FLAGS_maximum_image_height);
  266. if (source->AddSource(handler)) {
  267. LOGE(DEMO) << "failed to add " << stream_id;
  268. return -1;
  269. }
  270. // Start another thread to read file's binary data into memory and feed it to pipeline.
  271. gFeedMemFutures.emplace_back(std::async(std::launch::async, [files, handler, loop]() {
  272. auto memHandler = std::dynamic_pointer_cast<cnstream::ESJpegMemHandler>(handler);
  273. auto iter = files.begin();
  274. size_t jpeg_buffer_size = GetFileSize(*iter);
  275. std::unique_ptr<unsigned char[]> buf(new unsigned char[jpeg_buffer_size]);
  276. cnstream::ESPacket pkt;
  277. uint64_t pts = 0;
  278. while (thread_running.load() && iter != files.end()) {
  279. size_t file_size = GetFileSize(*iter);
  280. if (file_size > jpeg_buffer_size) {
  281. buf.reset(new unsigned char[file_size]);
  282. jpeg_buffer_size = file_size;
  283. }
  284. std::ifstream file_stream(*iter, std::ios::binary);
  285. if (!file_stream.is_open()) {
  286. LOGW(DEMO) << "failed to open " << (*iter);
  287. } else {
  288. file_stream.read(reinterpret_cast<char *>(buf.get()), file_size);
  289. pkt.data = buf.get();
  290. pkt.size = file_size;
  291. pkt.pts = pts++;
  292. if (memHandler->Write(&pkt) != 0) {
  293. break;
  294. }
  295. }
  296. if (++iter == files.end() && loop) {
  297. iter = files.begin();
  298. }
  299. }
  300. pkt.data = nullptr;
  301. pkt.size = 0;
  302. pkt.flags = static_cast<size_t>(cnstream::ESPacket::FLAG::FLAG_EOS);
  303. memHandler->Write(&pkt);
  304. return 0;
  305. }));
  306. return 0;
  307. }
  308. int AddSourceForDecompressedImage(cnstream::DataSource *source, const std::string &stream_id,
  309. const std::string &filename, const bool &loop, const bool &use_cv_mat) {
  310. // The following code is only for image input. For video input, you could use OpenCV VideoCapture.
  311. int index = filename.find_last_of("/");
  312. std::string dir_path = filename.substr(0, index);
  313. std::list<std::string> files = GetFileNameFromDir(dir_path, "*.jpg");
  314. if (files.empty()) {
  315. LOGE(DEMO) << "there is no jpg files";
  316. return -1;
  317. }
  318. auto handler = cnstream::RawImgMemHandler::Create(source, stream_id);
  319. if (source->AddSource(handler)) {
  320. LOGE(DEMO) << "failed to add " << stream_id;
  321. return -1;
  322. }
  323. // Start another thread to read data from files to cv mat and feed data to pipeline.
  324. gFeedMemFutures.emplace_back(std::async(std::launch::async, [files, handler, loop, use_cv_mat]() {
  325. auto memHandler = std::dynamic_pointer_cast<cnstream::RawImgMemHandler>(handler);
  326. auto iter = files.begin();
  327. int ret_code = -1;
  328. uint64_t pts = 0;
  329. while (thread_running.load() && iter != files.end()) {
  330. cv::Mat bgr_frame = cv::imread(*iter);
  331. if (!bgr_frame.empty()) {
  332. if (use_cv_mat) {
  333. // feed bgr24 image mat, with api-Write(cv::Mat)
  334. ret_code = memHandler->Write(&bgr_frame, pts++);
  335. } else {
  336. // feed rgb24 image data, with api-Write(unsigned char* data, int size, int w, int h, cnstream::CNDataFormat)
  337. cv::Mat rgb_frame(bgr_frame.rows, bgr_frame.cols, CV_8UC3);
  338. cv::cvtColor(bgr_frame, rgb_frame, cv::COLOR_BGR2RGB);
  339. ret_code = memHandler->Write(rgb_frame.data, rgb_frame.cols * rgb_frame.rows * 3, pts++, rgb_frame.cols,
  340. rgb_frame.rows, cnstream::CNDataFormat::CN_PIXEL_FORMAT_RGB24);
  341. }
  342. if (-2 == ret_code) {
  343. LOGW(DEMO) << "write image failed(invalid data).";
  344. }
  345. }
  346. if (++iter == files.end() && loop) {
  347. iter = files.begin();
  348. }
  349. }
  350. memHandler->Write(nullptr, 0);
  351. return 0;
  352. }));
  353. return 0;
  354. }
  355. int AddSourceForFile(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
  356. const int &frame_rate, const bool &loop,
  357. const cnstream::MaximumVideoResolution& maximum_resolution) {
  358. auto handler = cnstream::FileHandler::Create(source, stream_id, filename, frame_rate, loop, maximum_resolution);
  359. return source->AddSource(handler);
  360. }
  361. int main(int argc, char **argv) {
  362. gflags::ParseCommandLineFlags(&argc, &argv, false);
  363. cnstream::InitCNStreamLogging("/home/GSD/log/cnstream/");
  364. #if 0
  365. UserLogSink log_listener;
  366. cnstream::AddLogSink(&log_listener);
  367. #endif
  368. LOGI(DEMO) << "CNSTREAM VERSION:" << cnstream::VersionString();
  369. /*
  370. flags to variables
  371. */
  372. std::list<std::string> video_urls;
  373. if (FLAGS_data_name != "") {
  374. video_urls = {FLAGS_data_name};
  375. } else {
  376. video_urls = ::ReadFileList(FLAGS_data_path);
  377. }
  378. std::string source_name = "source"; // source module name, which is defined in pipeline json config
  379. /*
  380. build pipeline
  381. */
  382. cnstream::Pipeline pipeline("MyPipeline");
  383. if (!pipeline.BuildPipelineByJSONFile(FLAGS_config_fname)) {
  384. LOGE(DEMO) << "Build pipeline failed.";
  385. return EXIT_FAILURE;
  386. }
  387. /*
  388. message observer
  389. */
  390. MsgObserver msg_observer(&pipeline, source_name);
  391. pipeline.SetStreamMsgObserver(reinterpret_cast<cnstream::StreamMsgObserver *>(&msg_observer));
  392. /*
  393. find data source
  394. */
  395. cnstream::DataSource *source = dynamic_cast<cnstream::DataSource *>(pipeline.GetModule(source_name));
  396. if (nullptr == source) {
  397. LOGE(DEMO) << "DataSource module not found.";
  398. return EXIT_FAILURE;
  399. }
  400. /*
  401. start pipeline
  402. */
  403. if (!pipeline.Start()) {
  404. LOGE(DEMO) << "Pipeline start failed.";
  405. return EXIT_FAILURE;
  406. }
  407. /*
  408. start print performance informations
  409. */
  410. std::future<void> perf_print_th_ret;
  411. int trace_data_file_cnt = 0;
  412. if (pipeline.IsProfilingEnabled()) {
  413. perf_print_th_ret = std::async(std::launch::async, [&pipeline, &trace_data_file_cnt] {
  414. cnstream::Time last_time = cnstream::Clock::now();
  415. int trace_data_dump_times = 0;
  416. cnstream::TraceSerializeHelper trace_dumper;
  417. while (!gstop_perf_print) {
  418. std::this_thread::sleep_for(std::chrono::seconds(2));
  419. ::PrintPipelinePerformance("Whole", pipeline.GetProfiler()->GetProfile());
  420. if (pipeline.IsTracingEnabled()) {
  421. cnstream::Duration duration(2000);
  422. ::PrintPipelinePerformance("Last two seconds",
  423. pipeline.GetProfiler()->GetProfileBefore(cnstream::Clock::now(), duration));
  424. if (!FLAGS_trace_data_dir.empty()) {
  425. cnstream::Time now_time = cnstream::Clock::now();
  426. trace_dumper.Serialize(pipeline.GetTracer()->GetTrace(last_time, now_time));
  427. last_time = now_time;
  428. if (++trace_data_dump_times == 10) {
  429. trace_dumper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data_" +
  430. std::to_string(trace_data_file_cnt++));
  431. trace_dumper.Reset();
  432. trace_data_dump_times = 0;
  433. }
  434. }
  435. }
  436. }
  437. if (pipeline.IsTracingEnabled() && !FLAGS_trace_data_dir.empty() && trace_data_dump_times) {
  438. trace_dumper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data_" + std::to_string(trace_data_file_cnt++));
  439. trace_dumper.Reset();
  440. }
  441. });
  442. }
  443. /*
  444. add stream sources...
  445. */
  446. cnstream::MaximumVideoResolution maximum_video_resolution;
  447. maximum_video_resolution.maximum_width = FLAGS_maximum_video_width;
  448. maximum_video_resolution.maximum_height = FLAGS_maximum_video_height;
  449. maximum_video_resolution.enable_variable_resolutions =
  450. FLAGS_maximum_video_width != -1 && FLAGS_maximum_video_height != -1;
  451. int streams = static_cast<int>(video_urls.size());
  452. auto url_iter = video_urls.begin();
  453. for (int i = 0; i < streams; i++, url_iter++) {
  454. const std::string &filename = *url_iter;
  455. std::string stream_id = "stream_" + std::to_string(i);
  456. int ret = 0;
  457. std::unique_lock<std::mutex> lk(gmutex_for_add_source);
  458. if (nullptr != source) {
  459. if (filename.find("rtsp://") != std::string::npos) {
  460. ret = AddSourceForRtspStream(source, stream_id, filename, maximum_video_resolution);
  461. } else if (filename.find("/dev/video") != std::string::npos) { // only support linux
  462. ret = AddSourceForUsbCam(source, stream_id, filename, FLAGS_src_frame_rate,
  463. FLAGS_loop, maximum_video_resolution);
  464. } else if (filename.find(".jpg") != std::string::npos && FLAGS_jpeg_from_mem) {
  465. ret = AddSourceForImageInMem(source, stream_id, filename, FLAGS_loop);
  466. } else if (filename.find(".jpg") != std::string::npos && FLAGS_raw_img_input) {
  467. ret = AddSourceForDecompressedImage(source, stream_id, filename, FLAGS_loop, FLAGS_use_cv_mat);
  468. } else if (filename.find(".h264") != std::string::npos) {
  469. ret = AddSourceForVideoInMem(source, stream_id, filename, FLAGS_loop, maximum_video_resolution);
  470. } else {
  471. ret = AddSourceForFile(source, stream_id, filename, FLAGS_src_frame_rate, FLAGS_loop, maximum_video_resolution);
  472. }
  473. }
  474. if (ret == 0) {
  475. msg_observer.IncreaseStream(stream_id);
  476. }
  477. }
  478. #ifdef HAVE_DISPLAY
  479. auto quit_callback = [&pipeline, &source, &msg_observer]() {
  480. // stop feed-data threads before remove-sources...
  481. thread_running.store(false);
  482. for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) {
  483. gFeedMemFutures[i].wait();
  484. }
  485. msg_observer.WaitForStop();
  486. };
  487. gdisplayer = dynamic_cast<cnstream::Displayer *>(pipeline.GetModule("displayer"));
  488. if (gdisplayer && gdisplayer->Show()) {
  489. gdisplayer->GUILoop(quit_callback);
  490. #else
  491. if (false) {
  492. #endif
  493. } else {
  494. /*
  495. * close pipeline
  496. */
  497. if (FLAGS_loop) {
  498. // stop by hand or by FLAGS_wait_time
  499. if (FLAGS_wait_time) {
  500. std::this_thread::sleep_for(std::chrono::seconds(FLAGS_wait_time));
  501. LOGI(DEMO) << "run out time and quit...";
  502. } else {
  503. getchar();
  504. LOGI(DEMO) << "receive a character from stdin and quit...";
  505. }
  506. thread_running.store(false);
  507. if (nullptr != source) {
  508. source->RemoveSources();
  509. }
  510. for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) {
  511. gFeedMemFutures[i].wait();
  512. }
  513. msg_observer.WaitForStop();
  514. } else {
  515. // stop automatically
  516. msg_observer.WaitForStop();
  517. thread_running.store(false);
  518. for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) {
  519. gFeedMemFutures[i].wait();
  520. }
  521. }
  522. }
  523. gFeedMemFutures.clear();
  524. cnstream::ShutdownCNStreamLogging();
  525. if (pipeline.IsProfilingEnabled()) {
  526. gstop_perf_print = true;
  527. perf_print_th_ret.get();
  528. ::PrintPipelinePerformance("Whole", pipeline.GetProfiler()->GetProfile());
  529. }
  530. if (pipeline.IsTracingEnabled() && !FLAGS_trace_data_dir.empty()) {
  531. LOGI(DEMO) << "Wait for trace data merge ...";
  532. cnstream::TraceSerializeHelper helper;
  533. for (int file_index = 0; file_index < trace_data_file_cnt; ++file_index) {
  534. std::string filename = FLAGS_trace_data_dir + "/cnstream_trace_data_" + std::to_string(file_index);
  535. cnstream::TraceSerializeHelper t;
  536. cnstream::TraceSerializeHelper::DeserializeFromJSONFile(filename, &t);
  537. helper.Merge(t);
  538. remove(filename.c_str());
  539. }
  540. if (!helper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data.json")) {
  541. LOGE(DEMO) << "Dump trace data failed.";
  542. }
  543. }
  544. return EXIT_SUCCESS;
  545. }