test_pipeline.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. /*************************************************************************
  2. * Copyright (C) [2019] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #include <gtest/gtest.h>
  21. #include <cstdio>
  22. #include <cstdlib>
  23. #include <memory>
  24. #include <string>
  25. #include <utility>
  26. #include <vector>
  27. #include "cnstream_frame.hpp"
  28. #include "cnstream_pipeline.hpp"
  29. #include "test_base.hpp"
  30. static constexpr char kCNDataFrameTag[] = "CNDataFrame";
  31. namespace cnstream {
  32. class TPTestModule : public Module, public ModuleCreator<TPTestModule> {
  33. public:
  34. explicit TPTestModule(const std::string& name) : Module(name) {}
  35. bool Open(ModuleParamSet params) override {return true;}
  36. void Close() override {}
  37. int Process(std::shared_ptr<CNFrameInfo> frame_info) override {return 0;}
  38. }; // class TPTestModule
  39. class TPTestStreamMsgObserver : public StreamMsgObserver {
  40. public:
  41. void Update(const StreamMsg& msg) override {}
  42. }; // class TPTestStreamMsgObserver
  43. TEST(CorePipeline, GetName) {
  44. Pipeline pipeline("test_pipeline");
  45. EXPECT_EQ("test_pipeline", pipeline.GetName());
  46. }
  47. TEST(CorePipeline, BuildPipelineByModuleConfig) {
  48. // case1: right configs
  49. CNModuleConfig config1;
  50. config1.name = "modulea";
  51. config1.className = "cnstream::TPTestModule";
  52. config1.parallelism = 1;
  53. config1.maxInputQueueSize = 20;
  54. config1.next = {"moduleb"};
  55. CNModuleConfig config2;
  56. config2.name = "moduleb";
  57. config2.className = "cnstream::TPTestModule";
  58. config2.parallelism = 1;
  59. config2.maxInputQueueSize = 20;
  60. Pipeline pipeline("test_pipeline");
  61. EXPECT_TRUE(pipeline.BuildPipeline({config1, config2}));
  62. // case2: wrong configs
  63. config2.className = "wrong_class_name";
  64. EXPECT_FALSE(pipeline.BuildPipeline({config1, config2}));
  65. }
  66. TEST(CorePipeline, BuildPipelineByGraphConfig) {
  67. // case1: right graph config
  68. CNModuleConfig config1;
  69. config1.name = "modulea";
  70. config1.className = "cnstream::TPTestModule";
  71. config1.parallelism = 1;
  72. config1.maxInputQueueSize = 20;
  73. config1.next = {"moduleb"};
  74. CNModuleConfig config2;
  75. config2.name = "moduleb";
  76. config2.className = "cnstream::TPTestModule";
  77. config2.parallelism = 1;
  78. config2.maxInputQueueSize = 20;
  79. CNGraphConfig graph_config;
  80. graph_config.module_configs = {config1, config2};
  81. Pipeline pipeline("test_pipeline");
  82. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  83. // case2: wrong graph config(duplicated module name)
  84. graph_config.module_configs[1].name = "modulea";
  85. EXPECT_FALSE(pipeline.BuildPipeline(graph_config));
  86. // case3: create modules failed(wrong class name)
  87. graph_config.module_configs[1].name = "moduleb";
  88. graph_config.module_configs[1].className = "wrong_class_name";
  89. EXPECT_FALSE(pipeline.BuildPipeline(graph_config));
  90. // case4: parallelism is zero
  91. graph_config.module_configs[1] = config2;
  92. graph_config.module_configs[1].parallelism = 0;
  93. EXPECT_FALSE(pipeline.BuildPipeline(graph_config));
  94. // case4: max_input_queue_size is zero
  95. graph_config.module_configs[1] = config2;
  96. graph_config.module_configs[1].maxInputQueueSize = 0;
  97. EXPECT_FALSE(pipeline.BuildPipeline(graph_config));
  98. }
  99. TEST(CorePipeline, BuildPipelineByJSONFile) {
  100. // case1: right graph config
  101. std::pair<int, std::string> temp_file_desc = CreateTempFile("test_buildpipeline_config");
  102. std::string config_str = "{\n"
  103. "\"modulea\" : {\n"
  104. "\"class_name\" : \"cnstream::TPTestModule\",\n"
  105. "\"parallelism\" : 1,\n"
  106. "\"max_input_queue_size\" : 20,\n"
  107. "\"next_modules\" : [\"moduleb\"]\n"
  108. "},\n"
  109. "\"moduleb\" : {\n"
  110. "\"class_name\" : \"cnstream::TPTestModule\",\n"
  111. "\"parallelism\" : 1,\n"
  112. "\"max_input_queue_size\" : 20\n"
  113. "}\n"
  114. "}\n";
  115. EXPECT_EQ(config_str.size(), write(temp_file_desc.first, config_str.c_str(), config_str.size()))
  116. << "Write cofnig str to temp file for BuildPipelineByJSONFile test case failed! "
  117. << strerror(errno);
  118. Pipeline pipeline("test_pipeline");
  119. EXPECT_TRUE(pipeline.BuildPipelineByJSONFile(temp_file_desc.second));
  120. // case2: wrong json format
  121. config_str[config_str.size() - 2] = ',';
  122. EXPECT_NE(-1, ftruncate(temp_file_desc.first, 0)) << "Clear temp file content failed. "
  123. << strerror(errno);
  124. EXPECT_EQ(config_str.size(), write(temp_file_desc.first, config_str.c_str(), config_str.size()))
  125. << "Write cofnig str to temp file for BuildPipelineByJSONFile test case failed! "
  126. << strerror(errno);
  127. EXPECT_FALSE(pipeline.BuildPipelineByJSONFile(temp_file_desc.second));
  128. // case3: wrong graph config
  129. config_str = "{\n"
  130. "\"modulea\" : {\n"
  131. "\"class_name\" : \"wrong_class_name\",\n"
  132. "\"parallelism\" : 1,\n"
  133. "\"max_input_queue_size\" : 20\n"
  134. "}\n"
  135. "}\n";
  136. EXPECT_NE(-1, ftruncate(temp_file_desc.first, 0)) << "Clear temp file content failed. "
  137. << strerror(errno);
  138. EXPECT_EQ(config_str.size(), write(temp_file_desc.first, config_str.c_str(), config_str.size()))
  139. << "Write cofnig str to temp file for BuildPipelineByJSONFile test case failed! "
  140. << strerror(errno);
  141. EXPECT_FALSE(pipeline.BuildPipelineByJSONFile(temp_file_desc.second));
  142. // remove temp file
  143. close(temp_file_desc.first);
  144. unlink(temp_file_desc.second.c_str());
  145. }
  146. namespace __test_module_open_failed__ {
  147. class TestModuleOpenFailed : public Module, public ModuleCreator<TestModuleOpenFailed> {
  148. public:
  149. explicit TestModuleOpenFailed(const std::string& name) : Module(name) {}
  150. bool Open(ModuleParamSet params) override {return false;}
  151. void Close() override {}
  152. int Process(std::shared_ptr<CNFrameInfo> frame_info) override {return 0;}
  153. }; // class TestModuleOpenFailed
  154. } // namespace __test_module_open_failed__
  155. TEST(CorePipeline, Start) {
  156. // case1: start twice
  157. Pipeline pipeline("test_pipeline");
  158. CNModuleConfig config1;
  159. config1.name = "modulea";
  160. config1.className = "cnstream::TPTestModule";
  161. config1.parallelism = 1;
  162. config1.maxInputQueueSize = 20;
  163. config1.next = {"moduleb"};
  164. CNModuleConfig config2;
  165. config2.name = "moduleb";
  166. config2.className = "cnstream::TPTestModule";
  167. config2.parallelism = 1;
  168. config2.maxInputQueueSize = 20;
  169. CNGraphConfig graph_config;
  170. graph_config.module_configs = {config1, config2};
  171. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  172. EXPECT_TRUE(pipeline.Start());
  173. EXPECT_FALSE(pipeline.Start());
  174. EXPECT_TRUE(pipeline.IsRunning());
  175. pipeline.Stop();
  176. // case2: open module failed.
  177. graph_config.module_configs[1].className = "cnstream::__test_module_open_failed__::TestModuleOpenFailed";
  178. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  179. EXPECT_FALSE(pipeline.Start());
  180. EXPECT_FALSE(pipeline.IsRunning());
  181. }
  182. TEST(CorePipeline, Stop) {
  183. // case1: stop before start
  184. Pipeline pipeline("test_pipeline");
  185. CNModuleConfig config1;
  186. config1.name = "modulea";
  187. config1.className = "cnstream::TPTestModule";
  188. config1.parallelism = 1;
  189. config1.maxInputQueueSize = 20;
  190. config1.next = {"moduleb"};
  191. CNModuleConfig config2;
  192. config2.name = "moduleb";
  193. config2.className = "cnstream::TPTestModule";
  194. config2.parallelism = 1;
  195. config2.maxInputQueueSize = 20;
  196. CNGraphConfig graph_config;
  197. graph_config.module_configs = {config1, config2};
  198. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  199. EXPECT_TRUE(pipeline.Stop());
  200. // case2: stop success
  201. EXPECT_TRUE(pipeline.Start());
  202. EXPECT_TRUE(pipeline.Stop());
  203. }
  204. TEST(CorePipeline, IsRunning) {
  205. // case1: before start, not running
  206. Pipeline pipeline("test_pipeline");
  207. EXPECT_FALSE(pipeline.IsRunning());
  208. // case2: after start, running
  209. EXPECT_TRUE(pipeline.Start());
  210. EXPECT_TRUE(pipeline.IsRunning());
  211. // case3: after stop, not running
  212. EXPECT_TRUE(pipeline.Stop());
  213. EXPECT_FALSE(pipeline.IsRunning());
  214. }
  215. TEST(CorePipeline, GetModule) {
  216. Pipeline pipeline("test_pipeline");
  217. CNModuleConfig config;
  218. config.name = "modulea";
  219. config.className = "cnstream::TPTestModule";
  220. config.parallelism = 1;
  221. config.maxInputQueueSize = 20;
  222. CNGraphConfig graph_config;
  223. graph_config.module_configs = {config};
  224. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  225. // case1: right module name
  226. auto module = pipeline.GetModule("modulea");
  227. EXPECT_NE(nullptr, module);
  228. // case1: wrong module name
  229. module = pipeline.GetModule("wrong_module_name");
  230. EXPECT_EQ(nullptr, module);
  231. }
  232. TEST(CorePipeline, GetModuleConfig) {
  233. Pipeline pipeline("test_pipeline");
  234. CNModuleConfig config;
  235. config.name = "modulea";
  236. config.className = "cnstream::TPTestModule";
  237. config.parallelism = 1;
  238. config.maxInputQueueSize = 20;
  239. CNGraphConfig graph_config;
  240. graph_config.module_configs = {config};
  241. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  242. // case1: right module name
  243. auto module_config = pipeline.GetModuleConfig("modulea");
  244. EXPECT_EQ(module_config.name, config.name);
  245. // case1: wrong module name
  246. module_config = pipeline.GetModuleConfig("wrong_module_name");
  247. EXPECT_TRUE(module_config.name.empty());
  248. }
  249. TEST(CorePipeline, IsProfilingEnabled) {
  250. // case1: true
  251. Pipeline pipeline("test_pipeline");
  252. ProfilerConfig profiler_config;
  253. profiler_config.enable_profiling = true;
  254. EXPECT_TRUE(pipeline.BuildPipeline({}, profiler_config));
  255. EXPECT_TRUE(pipeline.IsProfilingEnabled());
  256. // case1: false
  257. profiler_config.enable_profiling = false;
  258. EXPECT_TRUE(pipeline.BuildPipeline({}, profiler_config));
  259. EXPECT_FALSE(pipeline.IsProfilingEnabled());
  260. }
  261. TEST(CorePipeline, IsTracingEnabled) {
  262. // case1: true
  263. Pipeline pipeline("test_pipeline");
  264. ProfilerConfig profiler_config;
  265. profiler_config.enable_tracing = true;
  266. EXPECT_TRUE(pipeline.BuildPipeline({}, profiler_config));
  267. EXPECT_TRUE(pipeline.IsTracingEnabled());
  268. // case1: false
  269. profiler_config.enable_tracing = false;
  270. EXPECT_TRUE(pipeline.BuildPipeline({}, profiler_config));
  271. EXPECT_FALSE(pipeline.IsTracingEnabled());
  272. }
  273. TEST(CorePipeline, ProvideData) {
  274. Pipeline pipeline("test_pipeline");
  275. CNModuleConfig config1;
  276. config1.name = "modulea";
  277. config1.className = "cnstream::TPTestModule";
  278. config1.parallelism = 1;
  279. config1.maxInputQueueSize = 20;
  280. config1.next = {"moduleb"};
  281. CNModuleConfig config2;
  282. config2.name = "moduleb";
  283. config2.className = "cnstream::TPTestModule";
  284. config2.parallelism = 1;
  285. config2.maxInputQueueSize = 20;
  286. CNGraphConfig graph_config;
  287. graph_config.module_configs = {config1, config2};
  288. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  289. auto module = pipeline.GetModule("modulea");
  290. auto data = CNFrameInfo::Create("1");
  291. // case1: provide data before pipeline running
  292. EXPECT_FALSE(pipeline.ProvideData(module, data));
  293. EXPECT_TRUE(pipeline.Start());
  294. // case2: provide data with an invalid module
  295. EXPECT_FALSE(pipeline.ProvideData(nullptr, data));
  296. // case3: provide data with an module not created by current pipeline
  297. TPTestModule orphan("orphan");
  298. EXPECT_FALSE(pipeline.ProvideData(&orphan, data));
  299. // case4: provide data with an module which is not a root node
  300. EXPECT_FALSE(pipeline.ProvideData(pipeline.GetModule("moduleb"), data));
  301. // case5: provide success
  302. EXPECT_TRUE(pipeline.ProvideData(module, data));
  303. pipeline.Stop();
  304. }
  305. TEST(CorePipeline, GetEventBus) {
  306. Pipeline pipeline("test_pipeline");
  307. EXPECT_NE(nullptr, pipeline.GetEventBus());
  308. }
  309. TEST(CorePipeline, SetStreamMsgObserver) {
  310. Pipeline pipeline("test_pipeline");
  311. TPTestStreamMsgObserver observer;
  312. pipeline.SetStreamMsgObserver(&observer);
  313. EXPECT_EQ(&observer, pipeline.GetStreamMsgObserver());
  314. }
  315. TEST(CorePipeline, GetStreamMsgObserver) {
  316. Pipeline pipeline("test_pipeline");
  317. TPTestStreamMsgObserver observer;
  318. EXPECT_EQ(nullptr, pipeline.GetStreamMsgObserver());
  319. pipeline.SetStreamMsgObserver(&observer);
  320. EXPECT_EQ(&observer, pipeline.GetStreamMsgObserver());
  321. }
  322. TEST(CorePipeline, GetProfiler) {
  323. Pipeline pipeline("test_pipeline");
  324. EXPECT_EQ(nullptr, pipeline.GetProfiler());
  325. ProfilerConfig profiler_config;
  326. profiler_config.enable_profiling = false;
  327. pipeline.BuildPipeline(std::vector<CNModuleConfig>(), profiler_config);
  328. EXPECT_EQ(nullptr, pipeline.GetProfiler());
  329. profiler_config.enable_profiling = true;
  330. pipeline.BuildPipeline(std::vector<CNModuleConfig>(), profiler_config);
  331. EXPECT_NE(nullptr, pipeline.GetProfiler());
  332. }
  333. TEST(CorePipeline, GetTracer) {
  334. Pipeline pipeline("test_pipeline");
  335. EXPECT_EQ(nullptr, pipeline.GetTracer());
  336. ProfilerConfig profiler_config;
  337. profiler_config.enable_tracing = false;
  338. pipeline.BuildPipeline(std::vector<CNModuleConfig>(), profiler_config);
  339. EXPECT_EQ(nullptr, pipeline.GetTracer());
  340. profiler_config.enable_tracing = true;
  341. pipeline.BuildPipeline(std::vector<CNModuleConfig>(), profiler_config);
  342. EXPECT_NE(nullptr, pipeline.GetTracer());
  343. }
  344. TEST(CorePipeline, IsRootNode) {
  345. Pipeline pipeline("test_pipeline");
  346. CNModuleConfig config1;
  347. config1.name = "modulea";
  348. config1.className = "cnstream::TPTestModule";
  349. config1.parallelism = 1;
  350. config1.maxInputQueueSize = 20;
  351. config1.next = {"moduleb"};
  352. CNModuleConfig config2;
  353. config2.name = "moduleb";
  354. config2.className = "cnstream::TPTestModule";
  355. config2.parallelism = 1;
  356. config2.maxInputQueueSize = 20;
  357. CNGraphConfig graph_config;
  358. graph_config.module_configs = {config1, config2};
  359. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  360. // case1: wrong module name
  361. EXPECT_FALSE(pipeline.IsRootNode("wrong_module_name"));
  362. // case2: not a root node
  363. EXPECT_FALSE(pipeline.IsRootNode("moduleb"));
  364. // case3: is a root node
  365. EXPECT_TRUE(pipeline.IsRootNode("modulea"));
  366. }
  367. TEST(CorePipeline, IsLeafNode) {
  368. Pipeline pipeline("test_pipeline");
  369. CNModuleConfig config1;
  370. config1.name = "modulea";
  371. config1.className = "cnstream::TPTestModule";
  372. config1.parallelism = 1;
  373. config1.maxInputQueueSize = 20;
  374. config1.next = {"moduleb"};
  375. CNModuleConfig config2;
  376. config2.name = "moduleb";
  377. config2.className = "cnstream::TPTestModule";
  378. config2.parallelism = 1;
  379. config2.maxInputQueueSize = 20;
  380. CNGraphConfig graph_config;
  381. graph_config.module_configs = {config1, config2};
  382. EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
  383. // case1: wrong module name
  384. EXPECT_FALSE(pipeline.IsLeafNode("wrong_module_name"));
  385. // case2: not a leaf node
  386. EXPECT_FALSE(pipeline.IsLeafNode("modulea"));
  387. // case3: is a leaf node
  388. EXPECT_TRUE(pipeline.IsLeafNode("moduleb"));
  389. }
  390. } // namespace cnstream