cnstream_pipeline.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. /*************************************************************************
  2. * Copyright (C) [2019] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #ifndef CNSTREAM_PIPELINE_HPP_
  21. #define CNSTREAM_PIPELINE_HPP_
  22. /**
  23. * @file cnstream_pipeline.hpp
  24. *
  25. * This file contains a declaration of the Pipeline class.
  26. */
  27. #include <atomic>
  28. #include <future>
  29. #include <iostream>
  30. #include <memory>
  31. #include <mutex>
  32. #include <set>
  33. #include <string>
  34. #include <thread>
  35. #include <unordered_map>
  36. #include <utility>
  37. #include <vector>
  38. #include "cnstream_common.hpp"
  39. #include "cnstream_config.hpp"
  40. #include "cnstream_eventbus.hpp"
  41. #include "cnstream_module.hpp"
  42. #include "cnstream_source.hpp"
  43. #include "util/cnstream_rwlock.hpp"
  44. #include "profiler/pipeline_profiler.hpp"
  45. namespace cnstream {
  46. class Connector;
  47. struct NodeContext;
  48. template<typename T>
  49. class CNGraph;
  50. class IdxManager;
  51. /**
  52. * @enum StreamMsgType
  53. *
  54. * @brief Enumeration variables describing the data stream message type.
  55. */
  56. enum class StreamMsgType {
  57. EOS_MSG = 0, /*!< The end of a stream message. The stream has received EOS message in all modules. */
  58. ERROR_MSG, /*!< An error message. The stream process has failed in one of the modules. */
  59. STREAM_ERR_MSG, /*!< Stream error message. */
  60. FRAME_ERR_MSG, /*!< Frame error message. */
  61. USER_MSG0 = 32, /*!< Reserved message. You can define your own messages. */
  62. USER_MSG1, /*!< Reserved message. You can define your own messages. */
  63. USER_MSG2, /*!< Reserved message. You can define your own messages. */
  64. USER_MSG3, /*!< Reserved message. You can define your own messages. */
  65. USER_MSG4, /*!< Reserved message. You can define your own messages. */
  66. USER_MSG5, /*!< Reserved message. You can define your own messages. */
  67. USER_MSG6, /*!< Reserved message. You can define your own messages. */
  68. USER_MSG7, /*!< Reserved message. You can define your own messages. */
  69. USER_MSG8, /*!< Reserved message. You can define your own messages. */
  70. USER_MSG9 /*!< Reserved message. You can define your own messages. */
  71. }; // enum StreamMsg
  72. /**
  73. * @struct StreamMsg
  74. *
  75. * @brief The StreamMsg is a structure holding the information of a stream message.
  76. *
  77. * @see StreamMsgType.
  78. */
  79. struct StreamMsg {
  80. StreamMsgType type; /*!< The type of a message. */
  81. std::string stream_id; /*!< Stream ID, set in CNFrameInfo::stream_id. */
  82. std::string module_name; /*!< The module that posts this event. */
  83. int64_t pts = -1; /*!< The PTS (Presentation Timestamp) of this frame. */
  84. };
  85. /**
  86. * @class StreamMsgObserver
  87. *
  88. * @brief Receives stream messages from a pipeline.
  89. * To receive stream messages from the pipeline, you can define a class to inherit the
  90. * StreamMsgObserver class and call the ``Update`` function. The
  91. * observer instance is bounded to the pipeline using the Pipeline::SetStreamMsgObserver function .
  92. *
  93. * @see Pipeline::SetStreamMsgObserver StreamMsg StreamMsgType.
  94. */
  95. class StreamMsgObserver {
  96. public:
  97. /**
  98. * @brief Receives stream messages from a pipeline passively.
  99. *
  100. * @param[in] msg The stream message from a pipeline.
  101. *
  102. * @return No return value.
  103. */
  104. virtual void Update(const StreamMsg& msg) = 0;
  105. /**
  106. * @brief Default destructor to destruct stream message observer.
  107. *
  108. * @return No return value.
  109. */
  110. virtual ~StreamMsgObserver() = default;
  111. }; // class StreamMsgObserver
  112. /**
  113. * @class Pipeline
  114. *
  115. * @brief Pipeline is the manager of the modules, which manages data transmission between modules and controls messages delivery.
  116. */
  117. class Pipeline : private NonCopyable {
  118. public:
  119. /**
  120. * @brief A constructor to construct one pipeline.
  121. *
  122. * @param[in] name The name of the pipeline.
  123. *
  124. * @return No return value.
  125. */
  126. explicit Pipeline(const std::string& name);
  127. /**
  128. * @brief A destructor to destruct one pipeline.
  129. *
  130. * @param[in] name The name of the pipeline.
  131. *
  132. * @return No return value.
  133. */
  134. virtual ~Pipeline();
  135. /**
  136. * @brief Gets the pipeline's name.
  137. *
  138. * @return Returns the pipeline's name.
  139. */
  140. const std::string& GetName() const;
  141. /**
  142. * @brief Builds a pipeline by module configurations.
  143. *
  144. * @param[in] module_configs The configurations of a module.
  145. * @param[in] profiler_config The configuration of a profiler.
  146. *
  147. * @return Returns true if this function has run successfully. Otherwise, returns false.
  148. */
  149. bool BuildPipeline(const std::vector<CNModuleConfig>& module_configs,
  150. const ProfilerConfig& profiler_config = ProfilerConfig());
  151. /**
  152. * @brief Builds a pipeline by graph configuration.
  153. *
  154. * @param[in] graph_config The configuration of a graph.
  155. *
  156. * @return Returns true if this function has run successfully. Otherwise, returns false.
  157. */
  158. bool BuildPipeline(const CNGraphConfig& graph_config);
  159. /**
  160. * @brief Builds a pipeline from a JSON file.
  161. * You can learn to write a configuration file by looking at the description of CNGraphConfig.
  162. *
  163. * @see CNGraphConfig
  164. *
  165. * @param[in] config_file The configuration file in JSON format.
  166. *
  167. * @return Returns true if this function has run successfully. Otherwise, returns false.
  168. *
  169. */
  170. bool BuildPipelineByJSONFile(const std::string& config_file);
  171. /**
  172. * @brief Starts a pipeline.
  173. * Starts data transmission in a pipeline.
  174. * Calls the ``Open`` function for all modules. See Module::Open.
  175. *
  176. * @return Returns true if this function has run successfully. Returns false if the ``Open``
  177. * function did not run successfully in one of the modules, or the link modules failed.
  178. */
  179. bool Start();
  180. /**
  181. * @brief Stops data transmissions in a pipeline.
  182. *
  183. * @return Returns true if this function has run successfully. Otherwise, returns false.
  184. */
  185. bool Stop();
  186. /**
  187. * @brief The running status of a pipeline.
  188. *
  189. * @return Returns true if the pipeline is running. Returns false if the pipeline is not running.
  190. */
  191. bool IsRunning() const;
  192. /**
  193. * @brief Gets a module in current pipeline by name.
  194. *
  195. * @param[in] module_name The module name specified in the module configuration.
  196. * If you specify a module name written in the module configuration, the first module with the same name as
  197. * the specified module name in the order of DFS will be returned.
  198. * When there are modules with the same name as other graphs in the subgraph, you can also find the
  199. * module by adding the graph name prefix divided by slash. eg. pipeline_name/subgraph1/module1.
  200. *
  201. * @return Returns the module pointer if the module has been added to
  202. * the current pipeline. Otherwise, returns nullptr.
  203. */
  204. Module* GetModule(const std::string& module_name) const;
  205. /**
  206. * @brief Gets the module configuration by the module name.
  207. *
  208. * @param[in] module_name The module name specified in module configuration.
  209. * The module name can be specified by two ways, see Pipeline::GetModule for detail.
  210. *
  211. * @return Returns module configuration if this function has run successfully.
  212. * Returns NULL if the module specified by ``module_name`` has not been
  213. * added to the current pipeline.
  214. */
  215. CNModuleConfig GetModuleConfig(const std::string& module_name) const;
  216. /**
  217. * @brief Checks if profiling is enabled.
  218. *
  219. * @return Returns true if profiling is enabled.
  220. **/
  221. bool IsProfilingEnabled() const;
  222. /**
  223. * @brief Checks if tracing is enabled.
  224. *
  225. * @return Returns true if tracing is enabled.
  226. **/
  227. bool IsTracingEnabled() const;
  228. /**
  229. * @brief Provides data for the pipeline that is used in source module or the module transmitted by itself.
  230. *
  231. * @param[in] module The module that provides data.
  232. * @param[in] data The data that is transmitted to the pipeline.
  233. *
  234. * @return Returns true if this function has run successfully. Returns false if the module
  235. * is not added in the pipeline or the pipeline has been stopped.
  236. *
  237. * @note ProvideData can be only called by the head modules in pipeline. A head module means the module
  238. * has no parent modules.
  239. *
  240. * @see Module::Process.
  241. */
  242. bool ProvideData(const Module* module, std::shared_ptr<CNFrameInfo> data);
  243. /**
  244. * @brief Gets the event bus in the pipeline.
  245. *
  246. * @return Returns the event bus.
  247. */
  248. EventBus* GetEventBus() const;
  249. /**
  250. * @brief Binds the stream message observer with a pipeline to receive stream message from this pipeline.
  251. *
  252. * @param[in] observer The stream message observer.
  253. *
  254. * @return No return value.
  255. *
  256. * @see StreamMsgObserver.
  257. */
  258. void SetStreamMsgObserver(StreamMsgObserver* observer);
  259. /**
  260. * @brief Gets the stream message observer that has been bound with this pipeline.
  261. *
  262. * @return Returns the stream message observer that has been bound with this pipeline.
  263. *
  264. * @see Pipeline::SetStreamMsgObserver.
  265. */
  266. StreamMsgObserver* GetStreamMsgObserver() const;
  267. /**
  268. * @brief Gets this pipeline's profiler.
  269. *
  270. * @return Returns profiler.
  271. */
  272. PipelineProfiler* GetProfiler() const;
  273. /**
  274. * @brief Gets this pipeline's tracer.
  275. *
  276. * @return Returns tracer.
  277. */
  278. PipelineTracer* GetTracer() const;
  279. /**
  280. * @brief Checks if module is root node of pipeline or not.
  281. * The module name can be specified by two ways, see Pipeline::GetModule for detail.
  282. *
  283. * @param[in] module_name module name.
  284. *
  285. * @return Returns true if it's root node, otherwise returns false.
  286. **/
  287. bool IsRootNode(const std::string& module_name) const;
  288. /**
  289. * @brief Checks if module is leaf node of pipeline.
  290. * The module name can be specified by two ways, see Pipeline::GetModule for detail.
  291. *
  292. * @param[in] module_name module name.
  293. *
  294. * @return Returns true if it's leaf node, otherwise returns false.
  295. **/
  296. bool IsLeafNode(const std::string& module_name) const;
  297. /**
  298. * @brief Registers a callback to be called after the frame process is done.
  299. *
  300. * @param[in] callback The call back function.
  301. *
  302. * @return No return value.
  303. *
  304. */
  305. void RegisterFrameDoneCallBack(const std::function<void(std::shared_ptr<CNFrameInfo>)>& callback);
  306. private:
  307. /** called by BuildPipeline **/
  308. bool CreateModules();
  309. void GenerateModulesMask();
  310. bool CreateConnectors();
  311. /* ------Internal methods------ */
  312. bool PassedByAllModules(uint64_t mask) const;
  313. void OnProcessStart(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
  314. void OnProcessEnd(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
  315. void OnProcessFailed(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data, int ret);
  316. void OnDataInvalid(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
  317. void OnEos(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
  318. void OnPassThrough(const std::shared_ptr<CNFrameInfo>& data);
  319. void TransmitData(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data);
  320. void TaskLoop(NodeContext* context, uint32_t conveyor_idx);
  321. EventHandleFlag DefaultBusWatch(const Event& event);
  322. void UpdateByStreamMsg(const StreamMsg& msg);
  323. void StreamMsgHandleFunc();
  324. std::unique_ptr<CNGraph<NodeContext>> graph_;
  325. std::string name_;
  326. std::atomic<bool> running_{false};
  327. std::unique_ptr<EventBus> event_bus_ = nullptr;
  328. std::unique_ptr<IdxManager> idxManager_ = nullptr;
  329. std::vector<std::thread> threads_;
  330. // message observer members
  331. ThreadSafeQueue<StreamMsg> msgq_;
  332. std::thread smsg_thread_;
  333. StreamMsgObserver* smsg_observer_ = nullptr;
  334. std::atomic<bool> exit_msg_loop_{false};
  335. uint64_t all_modules_mask_ = 0;
  336. std::unique_ptr<PipelineProfiler> profiler_;
  337. std::function<void(std::shared_ptr<CNFrameInfo>)> frame_done_cb_ = NULL;
  338. /**
  339. * StreamIdx helpers for SourceModule instances.
  340. * ModuleIdx helpers for Module instances
  341. */
  342. friend class Module;
  343. friend class SourceModule;
  344. uint32_t GetStreamIndex(const std::string& stream_id) {
  345. if (idxManager_) {
  346. return idxManager_->GetStreamIndex(stream_id);
  347. }
  348. return INVALID_STREAM_IDX;
  349. }
  350. void ReturnStreamIndex(const std::string& stream_id) {
  351. if (idxManager_) {
  352. idxManager_->ReturnStreamIndex(stream_id);
  353. }
  354. }
  355. size_t GetModuleIdx() {
  356. if (idxManager_) {
  357. return idxManager_->GetModuleIdx();
  358. }
  359. return INVALID_MODULE_ID;
  360. }
  361. void ReturnModuleIdx(size_t idx) {
  362. if (idxManager_) {
  363. idxManager_->ReturnModuleIdx(idx);
  364. }
  365. }
  366. }; // class Pipeline
  367. inline const std::string& Pipeline::GetName() const {
  368. return name_;
  369. }
  370. inline bool Pipeline::BuildPipeline(const std::vector<CNModuleConfig>& module_configs,
  371. const ProfilerConfig& profiler_config) {
  372. CNGraphConfig graph_config;
  373. graph_config.name = GetName();
  374. graph_config.module_configs = module_configs;
  375. graph_config.profiler_config = profiler_config;
  376. return BuildPipeline(graph_config);
  377. }
  378. inline bool Pipeline::BuildPipelineByJSONFile(const std::string& config_file) {
  379. CNGraphConfig graph_config;
  380. if (!graph_config.ParseByJSONFile(config_file)) {
  381. LOGE(CORE) << "Parse graph config file failed.";
  382. return false;
  383. }
  384. return BuildPipeline(graph_config);
  385. }
  386. inline bool Pipeline::IsRunning() const {
  387. return running_;
  388. }
  389. inline EventBus* Pipeline::GetEventBus() const {
  390. return event_bus_.get();
  391. }
  392. inline void Pipeline::SetStreamMsgObserver(StreamMsgObserver* observer) {
  393. smsg_observer_ = observer;
  394. }
  395. inline StreamMsgObserver* Pipeline::GetStreamMsgObserver() const {
  396. return smsg_observer_;
  397. }
  398. inline bool Pipeline::IsProfilingEnabled() const {
  399. return profiler_ ? profiler_->GetConfig().enable_profiling : false;
  400. }
  401. inline bool Pipeline::IsTracingEnabled() const {
  402. return profiler_ ? profiler_->GetConfig().enable_tracing : false;
  403. }
  404. inline PipelineProfiler* Pipeline::GetProfiler() const {
  405. return IsProfilingEnabled() ? profiler_.get() : nullptr;
  406. }
  407. inline PipelineTracer* Pipeline::GetTracer() const {
  408. return IsTracingEnabled() ? profiler_->GetTracer() : nullptr;
  409. }
  410. inline bool Pipeline::PassedByAllModules(uint64_t mask) const {
  411. return mask == all_modules_mask_;
  412. }
  413. inline void Pipeline::RegisterFrameDoneCallBack(const std::function<void(std::shared_ptr<CNFrameInfo>)>& callback) {
  414. frame_done_cb_ = callback;
  415. }
  416. } // namespace cnstream
  417. #endif // CNSTREAM_PIPELINE_HPP_