123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- #ifndef CNSTREAM_PIPELINE_HPP_
- #define CNSTREAM_PIPELINE_HPP_
- #include <atomic>
- #include <future>
- #include <iostream>
- #include <memory>
- #include <mutex>
- #include <set>
- #include <string>
- #include <thread>
- #include <unordered_map>
- #include <utility>
- #include <vector>
- #include "cnstream_common.hpp"
- #include "cnstream_config.hpp"
- #include "cnstream_eventbus.hpp"
- #include "cnstream_module.hpp"
- #include "cnstream_source.hpp"
- #include "util/cnstream_rwlock.hpp"
- #include "profiler/pipeline_profiler.hpp"
- namespace cnstream {
- class Connector;
- struct NodeContext;
- template<typename T>
- class CNGraph;
- class IdxManager;
- enum class StreamMsgType {
- EOS_MSG = 0,
- ERROR_MSG,
- STREAM_ERR_MSG,
- FRAME_ERR_MSG,
- USER_MSG0 = 32,
- USER_MSG1,
- USER_MSG2,
- USER_MSG3,
- USER_MSG4,
- USER_MSG5,
- USER_MSG6,
- USER_MSG7,
- USER_MSG8,
- USER_MSG9
- };
- struct StreamMsg {
- StreamMsgType type;
- std::string stream_id;
- std::string module_name;
- int64_t pts = -1;
- };
- class StreamMsgObserver {
- public:
-
- virtual void Update(const StreamMsg& msg) = 0;
-
- virtual ~StreamMsgObserver() = default;
- };
- class Pipeline : private NonCopyable {
- public:
-
- explicit Pipeline(const std::string& name);
-
- virtual ~Pipeline();
-
- const std::string& GetName() const;
-
- bool BuildPipeline(const std::vector<CNModuleConfig>& module_configs,
- const ProfilerConfig& profiler_config = ProfilerConfig());
-
- bool BuildPipeline(const CNGraphConfig& graph_config);
-
- bool BuildPipelineByJSONFile(const std::string& config_file);
-
- bool Start();
-
- bool Stop();
-
- bool IsRunning() const;
-
- Module* GetModule(const std::string& module_name) const;
-
- CNModuleConfig GetModuleConfig(const std::string& module_name) const;
-
- bool IsProfilingEnabled() const;
-
- bool IsTracingEnabled() const;
-
- bool ProvideData(const Module* module, std::shared_ptr<CNFrameInfo> data);
-
- EventBus* GetEventBus() const;
-
- void SetStreamMsgObserver(StreamMsgObserver* observer);
-
- StreamMsgObserver* GetStreamMsgObserver() const;
-
- PipelineProfiler* GetProfiler() const;
-
- PipelineTracer* GetTracer() const;
-
- bool IsRootNode(const std::string& module_name) const;
-
- bool IsLeafNode(const std::string& module_name) const;
-
- void RegisterFrameDoneCallBack(const std::function<void(std::shared_ptr<CNFrameInfo>)>& callback);
- private:
-
- bool CreateModules();
- void GenerateModulesMask();
- bool CreateConnectors();
-
- bool PassedByAllModules(uint64_t mask) const;
- void OnProcessStart(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
- void OnProcessEnd(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
- void OnProcessFailed(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data, int ret);
- void OnDataInvalid(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
- void OnEos(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
- void OnPassThrough(const std::shared_ptr<CNFrameInfo>& data);
- void TransmitData(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
- void TaskLoop(NodeContext* context, uint32_t conveyor_idx);
- EventHandleFlag DefaultBusWatch(const Event& event);
- void UpdateByStreamMsg(const StreamMsg& msg);
- void StreamMsgHandleFunc();
- std::unique_ptr<CNGraph<NodeContext>> graph_;
- std::string name_;
- std::atomic<bool> running_{false};
- std::unique_ptr<EventBus> event_bus_ = nullptr;
- std::unique_ptr<IdxManager> idxManager_ = nullptr;
- std::vector<std::thread> threads_;
-
- ThreadSafeQueue<StreamMsg> msgq_;
- std::thread smsg_thread_;
- StreamMsgObserver* smsg_observer_ = nullptr;
- std::atomic<bool> exit_msg_loop_{false};
- uint64_t all_modules_mask_ = 0;
- std::unique_ptr<PipelineProfiler> profiler_;
- std::function<void(std::shared_ptr<CNFrameInfo>)> frame_done_cb_ = NULL;
-
- friend class Module;
- friend class SourceModule;
- uint32_t GetStreamIndex(const std::string& stream_id) {
- if (idxManager_) {
- return idxManager_->GetStreamIndex(stream_id);
- }
- return INVALID_STREAM_IDX;
- }
- void ReturnStreamIndex(const std::string& stream_id) {
- if (idxManager_) {
- idxManager_->ReturnStreamIndex(stream_id);
- }
- }
- size_t GetModuleIdx() {
- if (idxManager_) {
- return idxManager_->GetModuleIdx();
- }
- return INVALID_MODULE_ID;
- }
- void ReturnModuleIdx(size_t idx) {
- if (idxManager_) {
- idxManager_->ReturnModuleIdx(idx);
- }
- }
- };
- inline const std::string& Pipeline::GetName() const {
- return name_;
- }
- inline bool Pipeline::BuildPipeline(const std::vector<CNModuleConfig>& module_configs,
- const ProfilerConfig& profiler_config) {
- CNGraphConfig graph_config;
- graph_config.name = GetName();
- graph_config.module_configs = module_configs;
- graph_config.profiler_config = profiler_config;
- return BuildPipeline(graph_config);
- }
- inline bool Pipeline::BuildPipelineByJSONFile(const std::string& config_file) {
- CNGraphConfig graph_config;
- if (!graph_config.ParseByJSONFile(config_file)) {
- LOGE(CORE) << "Parse graph config file failed.";
- return false;
- }
- return BuildPipeline(graph_config);
- }
- inline bool Pipeline::IsRunning() const {
- return running_;
- }
- inline EventBus* Pipeline::GetEventBus() const {
- return event_bus_.get();
- }
- inline void Pipeline::SetStreamMsgObserver(StreamMsgObserver* observer) {
- smsg_observer_ = observer;
- }
- inline StreamMsgObserver* Pipeline::GetStreamMsgObserver() const {
- return smsg_observer_;
- }
- inline bool Pipeline::IsProfilingEnabled() const {
- return profiler_ ? profiler_->GetConfig().enable_profiling : false;
- }
- inline bool Pipeline::IsTracingEnabled() const {
- return profiler_ ? profiler_->GetConfig().enable_tracing : false;
- }
- inline PipelineProfiler* Pipeline::GetProfiler() const {
- return IsProfilingEnabled() ? profiler_.get() : nullptr;
- }
- inline PipelineTracer* Pipeline::GetTracer() const {
- return IsTracingEnabled() ? profiler_->GetTracer() : nullptr;
- }
- inline bool Pipeline::PassedByAllModules(uint64_t mask) const {
- return mask == all_modules_mask_;
- }
- inline void Pipeline::RegisterFrameDoneCallBack(const std::function<void(std::shared_ptr<CNFrameInfo>)>& callback) {
- frame_done_cb_ = callback;
- }
- }
- #endif
|