/************************************************************************* * Copyright (C) [2019] by Cambricon, Inc. All rights reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. *************************************************************************/ #include #include #include #if (CV_MAJOR_VERSION >= 3) #include #endif #include #include #include #include #include #include #include #include #include #include #include #include "cnstream_version.hpp" #include "data_source.hpp" #ifdef HAVE_DISPLAY #include "displayer.hpp" #endif #include "cnstream_logging.hpp" #include "util.hpp" #include "profiler/pipeline_profiler.hpp" #include "profiler/profile.hpp" #include "profiler/trace_serialize_helper.hpp" DEFINE_string(data_path, "", "video file list."); DEFINE_string(data_name, "", "video file name."); DEFINE_int32(src_frame_rate, 25, "frame rate for send data"); DEFINE_int32(maximum_video_width, -1, "maximum video width, for variable video resolutions, " "not supported on MLU220/MLU270"); DEFINE_int32(maximum_video_height, -1, "maximum video height, for variable video resolutions, " "not supported on MLU220/MLU270"); DEFINE_int32(maximum_image_width, 7680, "maximum image width, valid when jpeg_from_mem is true"); DEFINE_int32(maximum_image_height, 4320, "maxmum image width, valid when jpeg_from_mem is true"); DEFINE_int32(wait_time, 0, "time of one test case"); DEFINE_bool(loop, false, "display repeat"); DEFINE_string(config_fname, "", "pipeline config filename"); DEFINE_bool(jpeg_from_mem, true, "Jpeg bitstream from mem."); DEFINE_bool(raw_img_input, false, "feed decompressed image to source"); DEFINE_bool(use_cv_mat, true, "feed cv mat to source. It is valid only if ``raw_img_input`` is set to true"); DEFINE_string(trace_data_dir, "", "dump trace data to specified dir. An empty string means that no data is stored"); #ifdef HAVE_DISPLAY cnstream::Displayer *gdisplayer = nullptr; #endif std::atomic thread_running{true}; std::atomic gstop_perf_print{false}; std::mutex gmutex_for_add_source; class UserLogSink : public cnstream::LogSink { public: void Send(cnstream::LogSeverity severity, const char *category, const char *filename, int line, const struct ::tm *tm_time, int32_t usecs, const char *message, size_t message_len) override { std::cout << "UserLogSink: " << ToString(severity, category, filename, line, tm_time, usecs, message, message_len) << std::endl; } }; class MsgObserver : cnstream::StreamMsgObserver { public: MsgObserver(cnstream::Pipeline *pipeline, std::string source_name) : pipeline_(pipeline), source_name_(source_name) {} void Update(const cnstream::StreamMsg &smsg) override { std::lock_guard add_src_lg(gmutex_for_add_source); std::lock_guard lg(mutex_); if (stop_) return; cnstream::DataSource *source = nullptr; source = dynamic_cast(pipeline_->GetModule(source_name_)); switch (smsg.type) { case cnstream::StreamMsgType::EOS_MSG: LOGI(DEMO) << "[" << pipeline_->GetName() << "] received EOS message from stream: [" << smsg.stream_id << "]"; if (stream_set_.find(smsg.stream_id) != stream_set_.end()) { if (source) source->RemoveSource(smsg.stream_id); stream_set_.erase(smsg.stream_id); } if (stream_set_.empty()) { LOGI(DEMO) << "[" << pipeline_->GetName() << "] received all EOS"; stop_ = true; } break; case cnstream::StreamMsgType::STREAM_ERR_MSG: LOGW(DEMO) << "[" << pipeline_->GetName() << "] received stream error from stream: " << smsg.stream_id << ", remove it from pipeline."; if (stream_set_.find(smsg.stream_id) != stream_set_.end()) { if (source) source->RemoveSource(smsg.stream_id, true); stream_set_.erase(smsg.stream_id); } if (stream_set_.empty()) { LOGI(DEMO) << "[" << pipeline_->GetName() << "] all streams is removed from pipeline, pipeline will stop."; stop_ = true; } break; case cnstream::StreamMsgType::ERROR_MSG: if (source) source->RemoveSources(true); stream_set_.clear(); stop_ = true; break; case cnstream::StreamMsgType::FRAME_ERR_MSG: LOGW(DEMO) << "[" << pipeline_->GetName() << "] received frame error from stream: " << smsg.stream_id << ", pts: " << smsg.pts << "."; break; default: LOGE(DEMO) << "[" << pipeline_->GetName() << "] unknown message type."; break; } if (stop_) { wakener_.notify_one(); } } void WaitForStop() { std::unique_lock lk(mutex_); if (stream_set_.empty()) { stop_ = true; } wakener_.wait(lk, [this]() { return stop_.load(); }); lk.unlock(); pipeline_->Stop(); } void IncreaseStream(std::string stream_id) { std::unique_lock lk(mutex_); if (stream_set_.find(stream_id) != stream_set_.end()) { LOGF(DEMO) << "IncreaseStream() The stream is ongoing []" << stream_id; } stream_set_.insert(stream_id); if (stop_) stop_ = false; } private: cnstream::Pipeline *pipeline_ = nullptr; std::string source_name_; std::atomic stop_{false}; std::set stream_set_; std::condition_variable wakener_; mutable std::mutex mutex_; }; int AddSourceForRtspStream(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename, const cnstream::MaximumVideoResolution& maximum_resolution) { auto handler = cnstream::RtspHandler::Create(source, stream_id, filename, false, 10, maximum_resolution); return source->AddSource(handler); } /** * @brief Adds source for usb camera. * * @note Supports Logitech C505e 720P USB CAMERA on Ubuntu 18.04.5. * Required steps: * 1. Compiles x264 * git clone https://code.videolan.org/videolan/x264.git * cd x264 * ./configure --enable-shared --prefix=/usr/local/x264 --disable-asm * make * sudo make install * 2. Compiles ffmpeg * wget http://www.ffmpeg.org/releases/ffmpeg-3.4.8.tar.xz * tar xvf ffmpeg-3.4.8.tar.xz * cd ffmpeg-3.4.8 * export PKG_CONFIG_PATH=/usr/local/x264/lib/pkgconfig * ./configure \ * --prefix=/usr/local/ \ * --enable-shared \ * --enable-static \ * --enable-gpl \ * --enable-nonfree \ * --enable-ffmpeg \ * --disable-ffplay \ * --enable-swscale \ * --pkg-config="pkg-config --static" \ * --enable-pthreads \ * --disable-armv5te \ * --disable-armv6 \ * --disable-armv6t2 \ * --disable-yasm \ * --disable-stripping \ * --enable-libx264 \ * --enable-libv4l2 \ * --extra-cflags=-I/usr/local/x264/include \ * --extra-ldflags=-L/usr/local/x264/lib * make -j4 * sudo make install * 3. Modifies modules/CMakeLists.txt * turns WITH_FFMPEG_AVDEVICE into ON * 4. Compiles cnstream * 5. Runs the demo of usb camera * cd samples/cns_launcher/object_detection * export LD_LIBRARY_PATH=/usr/local/x264/lib:$LD_LIBRARY_PATH * run.sh [mlu220/mlu270] [encode_jpeg/encode_video/display/rtsp] usb * * And the support of other types of USB CAMERA and operate system is not tested. * Above steps may be a reference. */ int AddSourceForUsbCam(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename, const int &frame_rate, const bool &loop, const cnstream::MaximumVideoResolution& maximum_resolution) { int ret = -1; auto handler = cnstream::FileHandler::Create(source, stream_id, filename, frame_rate, loop, maximum_resolution); ret = source->AddSource(handler); return ret; } std::vector> gFeedMemFutures; int AddSourceForVideoInMem(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename, const bool &loop, const cnstream::MaximumVideoResolution& maximum_resolution) { FILE *fp = fopen(filename.c_str(), "rb"); if (!fp) { LOGE(DEMO) << "Open file failed. file name : " << filename; return -1; } auto handler = cnstream::ESMemHandler::Create(source, stream_id, maximum_resolution); if (source->AddSource(handler)) { LOGE(DEMO) << "failed to add " << stream_id; return -1; } // Start another thread to read file's binary data into memory and feed it to pipeline. gFeedMemFutures.emplace_back(std::async(std::launch::async, [=]() { auto memHandler = std::dynamic_pointer_cast(handler); memHandler->SetDataType(cnstream::ESMemHandler::DataType::H264); unsigned char buf[4096]; while (thread_running.load()) { if (!feof(fp)) { int size = fread(buf, 1, 4096, fp); if (memHandler->Write(buf, size) != 0) { break; } } else if (loop) { fseek(fp, 0, SEEK_SET); } else { break; } } if (!feof(fp)) { memHandler->WriteEos(); } else { memHandler->Write(nullptr, 0); } fclose(fp); return 0; })); return 0; } int AddSourceForImageInMem(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename, const bool &loop) { int index = filename.find_last_of("/"); std::string dir_path = filename.substr(0, index); std::list files = GetFileNameFromDir(dir_path, "*.jpg"); if (files.empty()) { LOGE(DEMO) << "there is no jpg files"; return -1; } auto handler = cnstream::ESJpegMemHandler::Create(source, stream_id, FLAGS_maximum_image_width, FLAGS_maximum_image_height); if (source->AddSource(handler)) { LOGE(DEMO) << "failed to add " << stream_id; return -1; } // Start another thread to read file's binary data into memory and feed it to pipeline. gFeedMemFutures.emplace_back(std::async(std::launch::async, [files, handler, loop]() { auto memHandler = std::dynamic_pointer_cast(handler); auto iter = files.begin(); size_t jpeg_buffer_size = GetFileSize(*iter); std::unique_ptr buf(new unsigned char[jpeg_buffer_size]); cnstream::ESPacket pkt; uint64_t pts = 0; while (thread_running.load() && iter != files.end()) { size_t file_size = GetFileSize(*iter); if (file_size > jpeg_buffer_size) { buf.reset(new unsigned char[file_size]); jpeg_buffer_size = file_size; } std::ifstream file_stream(*iter, std::ios::binary); if (!file_stream.is_open()) { LOGW(DEMO) << "failed to open " << (*iter); } else { file_stream.read(reinterpret_cast(buf.get()), file_size); pkt.data = buf.get(); pkt.size = file_size; pkt.pts = pts++; if (memHandler->Write(&pkt) != 0) { break; } } if (++iter == files.end() && loop) { iter = files.begin(); } } pkt.data = nullptr; pkt.size = 0; pkt.flags = static_cast(cnstream::ESPacket::FLAG::FLAG_EOS); memHandler->Write(&pkt); return 0; })); return 0; } int AddSourceForDecompressedImage(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename, const bool &loop, const bool &use_cv_mat) { // The following code is only for image input. For video input, you could use OpenCV VideoCapture. int index = filename.find_last_of("/"); std::string dir_path = filename.substr(0, index); std::list files = GetFileNameFromDir(dir_path, "*.jpg"); if (files.empty()) { LOGE(DEMO) << "there is no jpg files"; return -1; } auto handler = cnstream::RawImgMemHandler::Create(source, stream_id); if (source->AddSource(handler)) { LOGE(DEMO) << "failed to add " << stream_id; return -1; } // Start another thread to read data from files to cv mat and feed data to pipeline. gFeedMemFutures.emplace_back(std::async(std::launch::async, [files, handler, loop, use_cv_mat]() { auto memHandler = std::dynamic_pointer_cast(handler); auto iter = files.begin(); int ret_code = -1; uint64_t pts = 0; while (thread_running.load() && iter != files.end()) { cv::Mat bgr_frame = cv::imread(*iter); if (!bgr_frame.empty()) { if (use_cv_mat) { // feed bgr24 image mat, with api-Write(cv::Mat) ret_code = memHandler->Write(&bgr_frame, pts++); } else { // feed rgb24 image data, with api-Write(unsigned char* data, int size, int w, int h, cnstream::CNDataFormat) cv::Mat rgb_frame(bgr_frame.rows, bgr_frame.cols, CV_8UC3); cv::cvtColor(bgr_frame, rgb_frame, cv::COLOR_BGR2RGB); ret_code = memHandler->Write(rgb_frame.data, rgb_frame.cols * rgb_frame.rows * 3, pts++, rgb_frame.cols, rgb_frame.rows, cnstream::CNDataFormat::CN_PIXEL_FORMAT_RGB24); } if (-2 == ret_code) { LOGW(DEMO) << "write image failed(invalid data)."; } } if (++iter == files.end() && loop) { iter = files.begin(); } } memHandler->Write(nullptr, 0); return 0; })); return 0; } int AddSourceForFile(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename, const int &frame_rate, const bool &loop, const cnstream::MaximumVideoResolution& maximum_resolution) { auto handler = cnstream::FileHandler::Create(source, stream_id, filename, frame_rate, loop, maximum_resolution); return source->AddSource(handler); } int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, false); cnstream::InitCNStreamLogging("/home/GSD/log/cnstream/"); #if 0 UserLogSink log_listener; cnstream::AddLogSink(&log_listener); #endif LOGI(DEMO) << "CNSTREAM VERSION:" << cnstream::VersionString(); /* flags to variables */ std::list video_urls; if (FLAGS_data_name != "") { video_urls = {FLAGS_data_name}; } else { video_urls = ::ReadFileList(FLAGS_data_path); } std::string source_name = "source"; // source module name, which is defined in pipeline json config /* build pipeline */ cnstream::Pipeline pipeline("MyPipeline"); if (!pipeline.BuildPipelineByJSONFile(FLAGS_config_fname)) { LOGE(DEMO) << "Build pipeline failed."; return EXIT_FAILURE; } /* message observer */ MsgObserver msg_observer(&pipeline, source_name); pipeline.SetStreamMsgObserver(reinterpret_cast(&msg_observer)); /* find data source */ cnstream::DataSource *source = dynamic_cast(pipeline.GetModule(source_name)); if (nullptr == source) { LOGE(DEMO) << "DataSource module not found."; return EXIT_FAILURE; } /* start pipeline */ if (!pipeline.Start()) { LOGE(DEMO) << "Pipeline start failed."; return EXIT_FAILURE; } /* start print performance informations */ std::future perf_print_th_ret; int trace_data_file_cnt = 0; if (pipeline.IsProfilingEnabled()) { perf_print_th_ret = std::async(std::launch::async, [&pipeline, &trace_data_file_cnt] { cnstream::Time last_time = cnstream::Clock::now(); int trace_data_dump_times = 0; cnstream::TraceSerializeHelper trace_dumper; while (!gstop_perf_print) { std::this_thread::sleep_for(std::chrono::seconds(2)); ::PrintPipelinePerformance("Whole", pipeline.GetProfiler()->GetProfile()); if (pipeline.IsTracingEnabled()) { cnstream::Duration duration(2000); ::PrintPipelinePerformance("Last two seconds", pipeline.GetProfiler()->GetProfileBefore(cnstream::Clock::now(), duration)); if (!FLAGS_trace_data_dir.empty()) { cnstream::Time now_time = cnstream::Clock::now(); trace_dumper.Serialize(pipeline.GetTracer()->GetTrace(last_time, now_time)); last_time = now_time; if (++trace_data_dump_times == 10) { trace_dumper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data_" + std::to_string(trace_data_file_cnt++)); trace_dumper.Reset(); trace_data_dump_times = 0; } } } } if (pipeline.IsTracingEnabled() && !FLAGS_trace_data_dir.empty() && trace_data_dump_times) { trace_dumper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data_" + std::to_string(trace_data_file_cnt++)); trace_dumper.Reset(); } }); } /* add stream sources... */ cnstream::MaximumVideoResolution maximum_video_resolution; maximum_video_resolution.maximum_width = FLAGS_maximum_video_width; maximum_video_resolution.maximum_height = FLAGS_maximum_video_height; maximum_video_resolution.enable_variable_resolutions = FLAGS_maximum_video_width != -1 && FLAGS_maximum_video_height != -1; int streams = static_cast(video_urls.size()); auto url_iter = video_urls.begin(); for (int i = 0; i < streams; i++, url_iter++) { const std::string &filename = *url_iter; std::string stream_id = "stream_" + std::to_string(i); int ret = 0; std::unique_lock lk(gmutex_for_add_source); if (nullptr != source) { if (filename.find("rtsp://") != std::string::npos) { ret = AddSourceForRtspStream(source, stream_id, filename, maximum_video_resolution); } else if (filename.find("/dev/video") != std::string::npos) { // only support linux ret = AddSourceForUsbCam(source, stream_id, filename, FLAGS_src_frame_rate, FLAGS_loop, maximum_video_resolution); } else if (filename.find(".jpg") != std::string::npos && FLAGS_jpeg_from_mem) { ret = AddSourceForImageInMem(source, stream_id, filename, FLAGS_loop); } else if (filename.find(".jpg") != std::string::npos && FLAGS_raw_img_input) { ret = AddSourceForDecompressedImage(source, stream_id, filename, FLAGS_loop, FLAGS_use_cv_mat); } else if (filename.find(".h264") != std::string::npos) { ret = AddSourceForVideoInMem(source, stream_id, filename, FLAGS_loop, maximum_video_resolution); } else { ret = AddSourceForFile(source, stream_id, filename, FLAGS_src_frame_rate, FLAGS_loop, maximum_video_resolution); } } if (ret == 0) { msg_observer.IncreaseStream(stream_id); } } #ifdef HAVE_DISPLAY auto quit_callback = [&pipeline, &source, &msg_observer]() { // stop feed-data threads before remove-sources... thread_running.store(false); for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) { gFeedMemFutures[i].wait(); } msg_observer.WaitForStop(); }; gdisplayer = dynamic_cast(pipeline.GetModule("displayer")); if (gdisplayer && gdisplayer->Show()) { gdisplayer->GUILoop(quit_callback); #else if (false) { #endif } else { /* * close pipeline */ if (FLAGS_loop) { // stop by hand or by FLAGS_wait_time if (FLAGS_wait_time) { std::this_thread::sleep_for(std::chrono::seconds(FLAGS_wait_time)); LOGI(DEMO) << "run out time and quit..."; } else { getchar(); LOGI(DEMO) << "receive a character from stdin and quit..."; } thread_running.store(false); if (nullptr != source) { source->RemoveSources(); } for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) { gFeedMemFutures[i].wait(); } msg_observer.WaitForStop(); } else { // stop automatically msg_observer.WaitForStop(); thread_running.store(false); for (unsigned int i = 0; i < gFeedMemFutures.size(); i++) { gFeedMemFutures[i].wait(); } } } gFeedMemFutures.clear(); cnstream::ShutdownCNStreamLogging(); if (pipeline.IsProfilingEnabled()) { gstop_perf_print = true; perf_print_th_ret.get(); ::PrintPipelinePerformance("Whole", pipeline.GetProfiler()->GetProfile()); } if (pipeline.IsTracingEnabled() && !FLAGS_trace_data_dir.empty()) { LOGI(DEMO) << "Wait for trace data merge ..."; cnstream::TraceSerializeHelper helper; for (int file_index = 0; file_index < trace_data_file_cnt; ++file_index) { std::string filename = FLAGS_trace_data_dir + "/cnstream_trace_data_" + std::to_string(file_index); cnstream::TraceSerializeHelper t; cnstream::TraceSerializeHelper::DeserializeFromJSONFile(filename, &t); helper.Merge(t); remove(filename.c_str()); } if (!helper.ToFile(FLAGS_trace_data_dir + "/cnstream_trace_data.json")) { LOGE(DEMO) << "Dump trace data failed."; } } return EXIT_SUCCESS; }