cnstream_module.hpp 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. /*************************************************************************
  2. * Copyright (C) [2019] by Cambricon, Inc. All rights reserved
  3. *
  4. * This source code is licensed under the Apache-2.0 license found in the
  5. * LICENSE file in the root directory of this source tree.
  6. *
  7. * A part of this source code is referenced from Nebula project.
  8. * https://github.com/Bwar/Nebula/blob/master/src/actor/DynamicCreator.hpp
  9. * https://github.com/Bwar/Nebula/blob/master/src/actor/ActorFactory.hpp
  10. *
  11. * Copyright (C) Bwar.
  12. *
  13. * This source code is licensed under the Apache-2.0 license found in the
  14. * LICENSE file in the root directory of this source tree.
  15. *
  16. *************************************************************************/
  17. #ifndef CNSTREAM_MODULE_HPP_
  18. #define CNSTREAM_MODULE_HPP_
  19. /**
  20. * @file cnstream_module.hpp
  21. *
  22. * This file contains a declaration of the Module class and the ModuleFactory class.
  23. */
  24. #include <cxxabi.h>
  25. #include <unistd.h>
  26. #include <atomic>
  27. #include <functional>
  28. #include <list>
  29. #include <memory>
  30. #include <string>
  31. #include <thread>
  32. #include <typeinfo>
  33. #include <unordered_map>
  34. #include <utility>
  35. #include <vector>
  36. #include "cnstream_common.hpp"
  37. #include "cnstream_config.hpp"
  38. #include "cnstream_eventbus.hpp"
  39. #include "cnstream_frame.hpp"
  40. #include "cnstream_logging.hpp"
  41. #include "private/cnstream_module_pri.hpp"
  42. #include "util/cnstream_queue.hpp"
  43. #include "util/cnstream_rwlock.hpp"
  44. namespace cnstream {
  45. class Pipeline;
  46. class ModuleProfiler;
  47. struct NodeContext;
  48. /**
  49. * @class IModuleObserver
  50. *
  51. * @brief IModuleObserver is an interface class. Users need to implement an observer
  52. * based on this, and register it to one module.
  53. */
  54. class IModuleObserver {
  55. public:
  56. /**
  57. * @brief Notifies "data" after being processed by this module.
  58. *
  59. * @param[in] data The frame that is notified to observer.
  60. *
  61. * @return No return value.
  62. */
  63. virtual void notify(std::shared_ptr<CNFrameInfo> data) = 0;
  64. /**
  65. * @brief Default destructor. A destructor to destruct module observer.
  66. *
  67. * @return No return value.
  68. */
  69. virtual ~IModuleObserver() = default;
  70. };
  71. /**
  72. * @class Module.
  73. *
  74. * @brief Module is the parent class of all modules. A module could have configurable
  75. * number of upstream links and downstream links.
  76. * Some modules are already constructed with a framework,
  77. * such as source, inferencer, and so on. You can also design your own modules.
  78. */
  79. class Module : private NonCopyable {
  80. public:
  81. /**
  82. * @brief Constructor. A constructor to construct module object.
  83. *
  84. * @param[in] name The name of a module. Modules defined in a pipeline must have different names.
  85. *
  86. * @return No return value.
  87. */
  88. explicit Module(const std::string &name) : name_(name) {}
  89. /**
  90. * @brief Destructor. A destructor to destruct module instance.
  91. *
  92. * @return No return value.
  93. */
  94. virtual ~Module();
  95. /**
  96. * @brief Registers an observer to the module.
  97. *
  98. * @param[in] observer An observer you defined.
  99. *
  100. * @return No return value.
  101. */
  102. void SetObserver(IModuleObserver *observer) {
  103. RwLockWriteGuard guard(observer_lock_);
  104. observer_ = observer;
  105. }
  106. /**
  107. * @brief Opens resources for a module.
  108. *
  109. * @param[in] param_set A set of parameters for this module.
  110. *
  111. * @return Returns true if this function has run successfully. Otherwise, returns false.
  112. *
  113. * @note You do not need to call this function by yourself. This function is called
  114. * by pipeline automatically when the pipeline is started. The pipeline calls the ``Process`` function
  115. * of this module automatically after the ``Open`` function is done.
  116. */
  117. virtual bool Open(ModuleParamSet param_set) = 0;
  118. /**
  119. * @brief Closes resources for a module.
  120. *
  121. * @return No return value.
  122. *
  123. * @note You do not need to call this function by yourself. This function is called
  124. * by pipeline automatically when the pipeline is stopped. The pipeline calls the ``Close`` function
  125. * of this module automatically after the ``Open`` and ``Process`` functions are done.
  126. */
  127. virtual void Close() = 0;
  128. /**
  129. * @brief Processes data.
  130. *
  131. * @param[in] data The data to be processed by the module.
  132. *
  133. * @retval 0: The data is processed successfully. The data should be transmitted in the framework then.
  134. * @retval >0: The data is processed successfully. The data has been handled by this module. The ``hasTransmit_`` must
  135. * be set. The Pipeline::ProvideData should be called by Module to transmit data to the next modules in the pipeline.
  136. * @retval <0: Pipeline will post an event with the EVENT_ERROR event type and return
  137. * number.
  138. */
  139. virtual int Process(std::shared_ptr<CNFrameInfo> data) = 0;
  140. /**
  141. * @brief Notifies flow-EOS arriving, the module should reset internal status if needed.
  142. *
  143. * @param[in] stream_id The stream identification.
  144. *
  145. * @note This function will be invoked when flow-EOS is forwarded by the framework.
  146. */
  147. virtual void OnEos(const std::string &stream_id) {}
  148. /**
  149. * @brief Gets the name of this module.
  150. *
  151. * @return Returns the name of this module.
  152. */
  153. inline std::string GetName() const { return name_; }
  154. /**
  155. * @brief Posts an event to the pipeline.
  156. *
  157. * @param[in] type The type of an event.
  158. * @param[in] msg The event message string.
  159. *
  160. * @return Returns true if this function has run successfully. Returns false if this
  161. * module has not been added to the pipeline.
  162. */
  163. bool PostEvent(EventType type, const std::string &msg);
  164. /**
  165. * @brief Posts an event to the pipeline.
  166. *
  167. * @param[in] Event with event type, stream_id, message, module name and thread_id.
  168. *
  169. * @return Returns true if this function has run successfully. Returns false if this
  170. * module has not been added to the pipeline.
  171. */
  172. bool PostEvent(Event e);
  173. /**
  174. * @brief Transmits data to the following stages.
  175. *
  176. * Valid when the module has permission to transmit data by itself.
  177. *
  178. * @param[in] data A pointer to the information of the frame.
  179. *
  180. * @return Returns true if the data has been transmitted successfully. Otherwise, returns false.
  181. */
  182. bool TransmitData(std::shared_ptr<CNFrameInfo> data);
  183. /**
  184. * @brief Checks parameters for a module, including parameter name, type, value, validity, and so on.
  185. *
  186. * @param[in] paramSet Parameters for this module.
  187. *
  188. * @return Returns true if this function has run successfully. Otherwise, returns false.
  189. */
  190. virtual bool CheckParamSet(const ModuleParamSet &paramSet) const { return true; }
  191. /**
  192. * @brief Gets the pipeline this module belongs to.
  193. *
  194. * @return Returns the pointer to pipeline instance.
  195. */
  196. Pipeline* GetContainer() const { return container_; }
  197. /**
  198. * @brief Gets module profiler.
  199. *
  200. * @return Returns a pointer to the module's profiler.
  201. */
  202. ModuleProfiler* GetProfiler();
  203. /**
  204. * @brief Checks if this module has permission to transmit data by itself.
  205. *
  206. * @return Returns true if this module has permission to transmit data by itself. Otherwise, returns false.
  207. *
  208. * @see Process
  209. */
  210. bool HasTransmit() const { return hasTransmit_.load(); }
  211. /**
  212. * Each module registers its own parameters and descriptions.
  213. * CNStream Inspect tool uses this class to detect parameters of each module.
  214. */
  215. ParamRegister param_register_;
  216. #ifdef UNIT_TEST
  217. public: // NOLINT
  218. #else
  219. protected: // NOLINT
  220. #endif
  221. friend class Pipeline;
  222. friend class CNFrameInfo;
  223. /**
  224. * @brief Sets a container to this module and identifies which pipeline the module is added to.
  225. *
  226. * @param[in] container A pipeline pointer to the container of this module.
  227. *
  228. * @note This function is called automatically by the pipeline after this module
  229. * is added into the pipeline. You do not need to call this function by yourself.
  230. */
  231. void SetContainer(Pipeline *container);
  232. /**
  233. * @brief Processes the data. This function is called by a pipeline.
  234. *
  235. * @param[in] data A pointer to the information of the frame.
  236. *
  237. * @retval 0: The process has been run successfully. The data should be transmitted by framework then.
  238. * @retval >0: The process has been run successfully. The data has been handled by this module. The ``hasTransmit_``
  239. * must be set. The Pipeline::ProvideData should be called by Module to transmit data to the next modules in the
  240. * pipeline.
  241. * @retval <0: Pipeline posts an event with the EVENT_ERROR event type and return number.
  242. */
  243. int DoProcess(std::shared_ptr<CNFrameInfo> data);
  244. Pipeline *container_ = nullptr; ///< The container.
  245. RwLock container_lock_;
  246. std::string name_; ///< The name of the module.
  247. std::atomic<bool> hasTransmit_{false}; ///< Whether it has permission to transmit data.
  248. #ifdef UNIT_TEST
  249. public: // NOLINT
  250. #else
  251. private: // NOLINT
  252. #endif
  253. IModuleObserver *observer_ = nullptr;
  254. RwLock observer_lock_;
  255. void NotifyObserver(std::shared_ptr<CNFrameInfo> data) {
  256. RwLockReadGuard guard(observer_lock_);
  257. if (observer_) {
  258. observer_->notify(data);
  259. }
  260. }
  261. int DoTransmitData(std::shared_ptr<CNFrameInfo> data);
  262. size_t GetId();
  263. size_t id_ = INVALID_MODULE_ID;
  264. NodeContext* context_ = nullptr; // used by pipeline
  265. };
  266. /**
  267. * @class ModuleEx
  268. *
  269. * @brief ModuleEx is the base class of the modules who have permission to transmit processed data by themselves.
  270. */
  271. class ModuleEx : public Module {
  272. public:
  273. /**
  274. * @brief Constructor. A constructor to construct the module which has permission to transmit processed data by
  275. * itself.
  276. *
  277. * @param[in] name The name of a module. Modules defined in a pipeline must have different names.
  278. *
  279. * @return No return value.
  280. */
  281. explicit ModuleEx(const std::string &name) : Module(name) { hasTransmit_.store(true); }
  282. };
  283. } // namespace cnstream
  284. #endif // CNSTREAM_MODULE_HPP_