123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608 |
- #include <gflags/gflags.h>
- #include <opencv2/highgui/highgui.hpp>
- #include <opencv2/imgproc/imgproc.hpp>
- #if (CV_MAJOR_VERSION >= 3)
- #include <opencv2/imgcodecs/imgcodecs.hpp>
- #endif
- #include <atomic>
- #include <condition_variable>
- #include <fstream>
- #include <iostream>
- #include <list>
- #include <memory>
- #include <mutex>
- #include <sstream>
- #include <string>
- #include <set>
- #include <vector>
- #include "cnstream_version.hpp"
- #include "data_source.hpp"
- #include "displayer.hpp"
- #endif
- #include "cnstream_logging.hpp"
- #include "util.hpp"
- #include "profiler/pipeline_profiler.hpp"
- #include "profiler/profile.hpp"
- #include "profiler/trace_serialize_helper.hpp"
- DEFINE_string(data_path, "", "video file list.");
- DEFINE_string(data_name, "", "video file name.");
- DEFINE_int32(src_frame_rate, 25, "frame rate for send data");
- DEFINE_int32(maximum_video_width, -1, "maximum video width, for variable video resolutions, "
- "not supported on MLU220/MLU270");
- DEFINE_int32(maximum_video_height, -1, "maximum video height, for variable video resolutions, "
- "not supported on MLU220/MLU270");
- DEFINE_int32(maximum_image_width, 7680, "maximum image width, valid when jpeg_from_mem is true");
- DEFINE_int32(maximum_image_height, 4320, "maxmum image width, valid when jpeg_from_mem is true");
- DEFINE_int32(wait_time, 0, "time of one test case");
- DEFINE_bool(loop, false, "display repeat");
- DEFINE_string(config_fname, "", "pipeline config filename");
- DEFINE_bool(jpeg_from_mem, true, "Jpeg bitstream from mem.");
- DEFINE_bool(raw_img_input, false, "feed decompressed image to source");
- DEFINE_bool(use_cv_mat, true, "feed cv mat to source. It is valid only if ``raw_img_input`` is set to true");
- DEFINE_string(trace_data_dir, "", "dump trace data to specified dir. An empty string means that no data is stored");
- cnstream::Displayer *gdisplayer = nullptr;
- #endif
- std::atomic<bool> thread_running{true};
- std::atomic<bool> gstop_perf_print{false};
- std::mutex gmutex_for_add_source;
- class UserLogSink : public cnstream::LogSink {
- public:
- void Send(cnstream::LogSeverity severity, const char *category, const char *filename, int line,
- const struct ::tm *tm_time, int32_t usecs, const char *message, size_t message_len) override {
- std::cout << "UserLogSink: " << ToString(severity, category, filename, line, tm_time, usecs, message, message_len)
- << std::endl;
- }
- };
- class MsgObserver : cnstream::StreamMsgObserver {
- public:
- MsgObserver(cnstream::Pipeline *pipeline, std::string source_name)
- : pipeline_(pipeline), source_name_(source_name) {}
- void Update(const cnstream::StreamMsg &smsg) override {
- std::lock_guard<std::mutex> add_src_lg(gmutex_for_add_source);
- std::lock_guard<std::mutex> lg(mutex_);
- if (stop_) return;
- cnstream::DataSource *source = nullptr;
- source = dynamic_cast<cnstream::DataSource *>(pipeline_->GetModule(source_name_));
- switch (smsg.type) {
- case cnstream::StreamMsgType::EOS_MSG:
- LOGI(DEMO) << "[" << pipeline_->GetName() << "] received EOS message from stream: [" << smsg.stream_id << "]";
- if (stream_set_.find(smsg.stream_id) != stream_set_.end()) {
- if (source) source->RemoveSource(smsg.stream_id);
- stream_set_.erase(smsg.stream_id);
- }
- if (stream_set_.empty()) {
- LOGI(DEMO) << "[" << pipeline_->GetName() << "] received all EOS";
- stop_ = true;
- }
- break;
- case cnstream::StreamMsgType::STREAM_ERR_MSG:
- LOGW(DEMO) << "[" << pipeline_->GetName() << "] received stream error from stream: " << smsg.stream_id
- << ", remove it from pipeline.";
- if (stream_set_.find(smsg.stream_id) != stream_set_.end()) {
- if (source) source->RemoveSource(smsg.stream_id, true);
- stream_set_.erase(smsg.stream_id);
- }
- if (stream_set_.empty()) {
- LOGI(DEMO) << "[" << pipeline_->GetName() << "] all streams is removed from pipeline, pipeline will stop.";
- stop_ = true;
- }
- break;
- case cnstream::StreamMsgType::ERROR_MSG:
- if (source) source->RemoveSources(true);
- stream_set_.clear();
- stop_ = true;
- break;
- case cnstream::StreamMsgType::FRAME_ERR_MSG:
- LOGW(DEMO) << "[" << pipeline_->GetName() << "] received frame error from stream: " << smsg.stream_id
- << ", pts: " << smsg.pts << ".";
- break;
- default:
- LOGE(DEMO) << "[" << pipeline_->GetName() << "] unknown message type.";
- break;
- }
- if (stop_) {
- wakener_.notify_one();
- }
- }
- void WaitForStop() {
- std::unique_lock<std::mutex> lk(mutex_);
- if (stream_set_.empty()) {
- stop_ = true;
- }
- wakener_.wait(lk, [this]() { return stop_.load(); });
- lk.unlock();
- pipeline_->Stop();
- }
- void IncreaseStream(std::string stream_id) {
- std::unique_lock<std::mutex> lk(mutex_);
- if (stream_set_.find(stream_id) != stream_set_.end()) {
- LOGF(DEMO) << "IncreaseStream() The stream is ongoing []" << stream_id;
- }
- stream_set_.insert(stream_id);
- if (stop_) stop_ = false;
- }
- private:
- cnstream::Pipeline *pipeline_ = nullptr;
- std::string source_name_;
- std::atomic<bool> stop_{false};
- std::set<std::string> stream_set_;
- std::condition_variable wakener_;
- mutable std::mutex mutex_;
- };
- int AddSourceForRtspStream(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
- const cnstream::MaximumVideoResolution& maximum_resolution) {
- auto handler = cnstream::RtspHandler::Create(source, stream_id, filename, false, 10, maximum_resolution);
- return source->AddSource(handler);
- }
- int AddSourceForUsbCam(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
- const int &frame_rate, const bool &loop,
- const cnstream::MaximumVideoResolution& maximum_resolution) {
- int ret = -1;
- auto handler = cnstream::FileHandler::Create(source, stream_id, filename, frame_rate, loop, maximum_resolution);
- ret = source->AddSource(handler);
- return ret;
- }
- std::vector<std::future<int>> gFeedMemFutures;
- int AddSourceForVideoInMem(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
- const bool &loop, const cnstream::MaximumVideoResolution& maximum_resolution) {
- FILE *fp = fopen(filename.c_str(), "rb");
- if (!fp) {
- LOGE(DEMO) << "Open file failed. file name : " << filename;
- return -1;
- }
- auto handler = cnstream::ESMemHandler::Create(source, stream_id, maximum_resolution);
- if (source->AddSource(handler)) {
- LOGE(DEMO) << "failed to add " << stream_id;
- return -1;
- }
- gFeedMemFutures.emplace_back(std::async(std::launch::async, [=]() {
- auto memHandler = std::dynamic_pointer_cast<cnstream::ESMemHandler>(handler);
- memHandler->SetDataType(cnstream::ESMemHandler::DataType::H264);
- unsigned char buf[4096];
- while (thread_running.load()) {
- if (!feof(fp)) {
- int size = fread(buf, 1, 4096, fp);
- if (memHandler->Write(buf, size) != 0) {
- break;
- }
- } else if (loop) {
- fseek(fp, 0, SEEK_SET);
- } else {
- break;
- }
- }
- if (!feof(fp)) {
- memHandler->WriteEos();
- } else {
- memHandler->Write(nullptr, 0);
- }
- fclose(fp);
- return 0;
- }));
- return 0;
- }
- int AddSourceForImageInMem(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
- const bool &loop) {
- int index = filename.find_last_of("/");
- std::string dir_path = filename.substr(0, index);
- std::list<std::string> files = GetFileNameFromDir(dir_path, "*.jpg");
- if (files.empty()) {
- LOGE(DEMO) << "there is no jpg files";
- return -1;
- }
- auto handler = cnstream::ESJpegMemHandler::Create(source, stream_id,
- FLAGS_maximum_image_width, FLAGS_maximum_image_height);
- if (source->AddSource(handler)) {
- LOGE(DEMO) << "failed to add " << stream_id;
- return -1;
- }
- gFeedMemFutures.emplace_back(std::async(std::launch::async, [files, handler, loop]() {
- auto memHandler = std::dynamic_pointer_cast<cnstream::ESJpegMemHandler>(handler);
- auto iter = files.begin();
- size_t jpeg_buffer_size = GetFileSize(*iter);
- std::unique_ptr<unsigned char[]> buf(new unsigned char[jpeg_buffer_size]);
- cnstream::ESPacket pkt;
- uint64_t pts = 0;
- while (thread_running.load() && iter != files.end()) {
- size_t file_size = GetFileSize(*iter);
- if (file_size > jpeg_buffer_size) {
- buf.reset(new unsigned char[file_size]);
- jpeg_buffer_size = file_size;
- }
- std::ifstream file_stream(*iter, std::ios::binary);
- if (!file_stream.is_open()) {
- LOGW(DEMO) << "failed to open " << (*iter);
- } else {
- file_stream.read(reinterpret_cast<char *>(buf.get()), file_size);
- pkt.data = buf.get();
- pkt.size = file_size;
- pkt.pts = pts++;
- if (memHandler->Write(&pkt) != 0) {
- break;
- }
- }
- if (++iter == files.end() && loop) {
- iter = files.begin();
- }
- }
- pkt.data = nullptr;
- pkt.size = 0;
- pkt.flags = static_cast<size_t>(cnstream::ESPacket::FLAG::FLAG_EOS);
- memHandler->Write(&pkt);
- return 0;
- }));
- return 0;
- }
- int AddSourceForDecompressedImage(cnstream::DataSource *source, const std::string &stream_id,
- const std::string &filename, const bool &loop, const bool &use_cv_mat) {
- int index = filename.find_last_of("/");
- std::string dir_path = filename.substr(0, index);
- std::list<std::string> files = GetFileNameFromDir(dir_path, "*.jpg");
- if (files.empty()) {
- LOGE(DEMO) << "there is no jpg files";
- return -1;
- }
- auto handler = cnstream::RawImgMemHandler::Create(source, stream_id);
- if (source->AddSource(handler)) {
- LOGE(DEMO) << "failed to add " << stream_id;
- return -1;
- }
- gFeedMemFutures.emplace_back(std::async(std::launch::async, [files, handler, loop, use_cv_mat]() {
- auto memHandler = std::dynamic_pointer_cast<cnstream::RawImgMemHandler>(handler);
- auto iter = files.begin();
- int ret_code = -1;
- uint64_t pts = 0;
- while (thread_running.load() && iter != files.end()) {
- cv::Mat bgr_frame = cv::imread(*iter);
- if (!bgr_frame.empty()) {
- if (use_cv_mat) {
- ret_code = memHandler->Write(&bgr_frame, pts++);
- } else {
- cv::Mat rgb_frame(bgr_frame.rows, bgr_frame.cols, CV_8UC3);
- cv::cvtColor(bgr_frame, rgb_frame, cv::COLOR_BGR2RGB);
- ret_code = memHandler->Write(rgb_frame.data, rgb_frame.cols * rgb_frame.rows * 3, pts++, rgb_frame.cols,
- rgb_frame.rows, cnstream::CNDataFormat::CN_PIXEL_FORMAT_RGB24);
- }
- if (-2 == ret_code) {
- LOGW(DEMO) << "write image failed(invalid data).";
- }
- }
- if (++iter == files.end() && loop) {
- iter = files.begin();
- }
- }
- memHandler->Write(nullptr, 0);
- return 0;
- }));
- return 0;
- }
- int AddSourceForFile(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
- const int &frame_rate, const bool &loop,
- const cnstream::MaximumVideoResolution& maximum_resolution) {
- auto handler = cnstream::FileHandler::Create(source, stream_id, filename, frame_rate, loop, maximum_resolution);
- return source->AddSource(handler);
- }
- int main(int argc, char **argv) {
- gflags::ParseCommandLineFlags(&argc, &argv, false);
- cnstream::InitCNStreamLogging("/home/GSD/log/cnstream/");
- #if 0
- UserLogSink log_listener;
- cnstream::AddLogSink(&log_listener);
- #endif
- LOGI(DEMO) << "CNSTREAM VERSION:" << cnstream::VersionString();
- std::list<std::string> video_urls;
- if (FLAGS_data_name != "") {
- video_urls = {FLAGS_data_name};
- } else {
- video_urls = ::ReadFileList(FLAGS_data_path);
- }
- std::string source_name = "source";
- cnstream::Pipeline pipeline("MyPipeline");
- if (!pipeline.BuildPipelineByJSONFile(FLAGS_config_fname)) {
- LOGE(DEMO) << "Build pipeline failed.";
- return EXIT_FAILURE;
- }
- MsgObserver msg_observer(&pipeline, source_name);
- pipeline.SetStreamMsgObserver(reinterpret_cast<cnstream::StreamMsgObserver *>(&msg_observer));
- cnstream::DataSource *source = dynamic_cast<cnstream::DataSource *>(pipeline.GetModule(source_name));
- if (nullptr == source) {
- LOGE(DEMO) << "DataSource module not found.";
- return EXIT_FAILURE;
- }
- if (!pipeline.Start()) {
- LOGE(DEMO) << "Pipeline start failed.";
- return EXIT_FAILURE;
- }
- std::future<void> perf_print_th_ret;
- int trace_data_file_cnt = 0;
- if (pipeline.IsProfilingEnabled()) {
- perf_print_th_ret = std::async(std::launch::async, [&pipeline, &trace_data_file_cnt] {
- cnstream::Time last_time = cnstream::Clock::now();
- int trace_data_dump_times = 0;
- cnstream::TraceSerializeHelper trace_dumper;
- while (!gstop_perf_print) {
- std::this_thread::sleep_for(std::chrono::seconds(2));
- ::PrintPipelinePerformance("Whole", pipeline.GetProfiler()->GetProfile());
- if (pipeline.IsTracingEnabled()) {
- cnstream::Duration duration(2000);
- ::PrintPipelinePerformance("Last two seconds",
- pipeline.GetProfiler()->GetProfileBefore(cnstream::Clock::now(), duration));
- if (!FLAGS_trace_data_dir.empty()) {
- cnstream::Time now_time = cnstream::Clock::now();
- trace_dumper.Serialize(pipeline.GetTracer()->GetTrace(last_time, now_time));
- last_time = now_time;
- if (++trace_data_dump_times == 10) {
- trace_dumper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data_" +
- std::to_string(trace_data_file_cnt++));
- trace_dumper.Reset();
- trace_data_dump_times = 0;
- }
- }
- }
- }
- if (pipeline.IsTracingEnabled() && !FLAGS_trace_data_dir.empty() && trace_data_dump_times) {
- trace_dumper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data_" + std::to_string(trace_data_file_cnt++));
- trace_dumper.Reset();
- }
- });
- }
- cnstream::MaximumVideoResolution maximum_video_resolution;
- maximum_video_resolution.maximum_width = FLAGS_maximum_video_width;
- maximum_video_resolution.maximum_height = FLAGS_maximum_video_height;
- maximum_video_resolution.enable_variable_resolutions =
- FLAGS_maximum_video_width != -1 && FLAGS_maximum_video_height != -1;
- int streams = static_cast<int>(video_urls.size());
- auto url_iter = video_urls.begin();
- for (int i = 0; i < streams; i++, url_iter++) {
- const std::string &filename = *url_iter;
- std::string stream_id = "stream_" + std::to_string(i);
- int ret = 0;
- std::unique_lock<std::mutex> lk(gmutex_for_add_source);
- if (nullptr != source) {
- if (filename.find("rtsp://") != std::string::npos) {
- ret = AddSourceForRtspStream(source, stream_id, filename, maximum_video_resolution);
- } else if (filename.find("/dev/video") != std::string::npos) {
- ret = AddSourceForUsbCam(source, stream_id, filename, FLAGS_src_frame_rate,
- FLAGS_loop, maximum_video_resolution);
- } else if (filename.find(".jpg") != std::string::npos && FLAGS_jpeg_from_mem) {
- ret = AddSourceForImageInMem(source, stream_id, filename, FLAGS_loop);
- } else if (filename.find(".jpg") != std::string::npos && FLAGS_raw_img_input) {
- ret = AddSourceForDecompressedImage(source, stream_id, filename, FLAGS_loop, FLAGS_use_cv_mat);
- } else if (filename.find(".h264") != std::string::npos) {
- ret = AddSourceForVideoInMem(source, stream_id, filename, FLAGS_loop, maximum_video_resolution);
- } else {
- ret = AddSourceForFile(source, stream_id, filename, FLAGS_src_frame_rate, FLAGS_loop, maximum_video_resolution);
- }
- }
- if (ret == 0) {
- msg_observer.IncreaseStream(stream_id);
- }
- }
- auto quit_callback = [&pipeline, &source, &msg_observer]() {
- thread_running.store(false);
- for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) {
- gFeedMemFutures[i].wait();
- }
- msg_observer.WaitForStop();
- };
- gdisplayer = dynamic_cast<cnstream::Displayer *>(pipeline.GetModule("displayer"));
- if (gdisplayer && gdisplayer->Show()) {
- gdisplayer->GUILoop(quit_callback);
- #else
- if (false) {
- #endif
- } else {
- if (FLAGS_loop) {
- if (FLAGS_wait_time) {
- std::this_thread::sleep_for(std::chrono::seconds(FLAGS_wait_time));
- LOGI(DEMO) << "run out time and quit...";
- } else {
- getchar();
- LOGI(DEMO) << "receive a character from stdin and quit...";
- }
- thread_running.store(false);
- if (nullptr != source) {
- source->RemoveSources();
- }
- for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) {
- gFeedMemFutures[i].wait();
- }
- msg_observer.WaitForStop();
- } else {
- msg_observer.WaitForStop();
- thread_running.store(false);
- for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) {
- gFeedMemFutures[i].wait();
- }
- }
- }
- gFeedMemFutures.clear();
- cnstream::ShutdownCNStreamLogging();
- if (pipeline.IsProfilingEnabled()) {
- gstop_perf_print = true;
- perf_print_th_ret.get();
- ::PrintPipelinePerformance("Whole", pipeline.GetProfiler()->GetProfile());
- }
- if (pipeline.IsTracingEnabled() && !FLAGS_trace_data_dir.empty()) {
- LOGI(DEMO) << "Wait for trace data merge ...";
- cnstream::TraceSerializeHelper helper;
- for (int file_index = 0; file_index < trace_data_file_cnt; ++file_index) {
- std::string filename = FLAGS_trace_data_dir + "/cnstream_trace_data_" + std::to_string(file_index);
- cnstream::TraceSerializeHelper t;
- cnstream::TraceSerializeHelper::DeserializeFromJSONFile(filename, &t);
- helper.Merge(t);
- remove(filename.c_str());
- }
- if (!helper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data.json")) {
- LOGE(DEMO) << "Dump trace data failed.";
- }
- }
- return EXIT_SUCCESS;
- }