123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- #include <gtest/gtest.h>
- #include <cstdio>
- #include <cstdlib>
- #include <memory>
- #include <string>
- #include <utility>
- #include <vector>
- #include "cnstream_frame.hpp"
- #include "cnstream_pipeline.hpp"
- #include "test_base.hpp"
- static constexpr char kCNDataFrameTag[] = "CNDataFrame";
- namespace cnstream {
- class TPTestModule : public Module, public ModuleCreator<TPTestModule> {
- public:
- explicit TPTestModule(const std::string& name) : Module(name) {}
- bool Open(ModuleParamSet params) override {return true;}
- void Close() override {}
- int Process(std::shared_ptr<CNFrameInfo> frame_info) override {return 0;}
- };
- class TPTestStreamMsgObserver : public StreamMsgObserver {
- public:
- void Update(const StreamMsg& msg) override {}
- };
- TEST(CorePipeline, GetName) {
- Pipeline pipeline("test_pipeline");
- EXPECT_EQ("test_pipeline", pipeline.GetName());
- }
- TEST(CorePipeline, BuildPipelineByModuleConfig) {
-
- CNModuleConfig config1;
- config1.name = "modulea";
- config1.className = "cnstream::TPTestModule";
- config1.parallelism = 1;
- config1.maxInputQueueSize = 20;
- config1.next = {"moduleb"};
- CNModuleConfig config2;
- config2.name = "moduleb";
- config2.className = "cnstream::TPTestModule";
- config2.parallelism = 1;
- config2.maxInputQueueSize = 20;
- Pipeline pipeline("test_pipeline");
- EXPECT_TRUE(pipeline.BuildPipeline({config1, config2}));
-
- config2.className = "wrong_class_name";
- EXPECT_FALSE(pipeline.BuildPipeline({config1, config2}));
- }
- TEST(CorePipeline, BuildPipelineByGraphConfig) {
-
- CNModuleConfig config1;
- config1.name = "modulea";
- config1.className = "cnstream::TPTestModule";
- config1.parallelism = 1;
- config1.maxInputQueueSize = 20;
- config1.next = {"moduleb"};
- CNModuleConfig config2;
- config2.name = "moduleb";
- config2.className = "cnstream::TPTestModule";
- config2.parallelism = 1;
- config2.maxInputQueueSize = 20;
- CNGraphConfig graph_config;
- graph_config.module_configs = {config1, config2};
- Pipeline pipeline("test_pipeline");
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
-
- graph_config.module_configs[1].name = "modulea";
- EXPECT_FALSE(pipeline.BuildPipeline(graph_config));
-
- graph_config.module_configs[1].name = "moduleb";
- graph_config.module_configs[1].className = "wrong_class_name";
- EXPECT_FALSE(pipeline.BuildPipeline(graph_config));
-
- graph_config.module_configs[1] = config2;
- graph_config.module_configs[1].parallelism = 0;
- EXPECT_FALSE(pipeline.BuildPipeline(graph_config));
-
- graph_config.module_configs[1] = config2;
- graph_config.module_configs[1].maxInputQueueSize = 0;
- EXPECT_FALSE(pipeline.BuildPipeline(graph_config));
- }
- TEST(CorePipeline, BuildPipelineByJSONFile) {
-
- std::pair<int, std::string> temp_file_desc = CreateTempFile("test_buildpipeline_config");
- std::string config_str = "{\n"
- "\"modulea\" : {\n"
- "\"class_name\" : \"cnstream::TPTestModule\",\n"
- "\"parallelism\" : 1,\n"
- "\"max_input_queue_size\" : 20,\n"
- "\"next_modules\" : [\"moduleb\"]\n"
- "},\n"
- "\"moduleb\" : {\n"
- "\"class_name\" : \"cnstream::TPTestModule\",\n"
- "\"parallelism\" : 1,\n"
- "\"max_input_queue_size\" : 20\n"
- "}\n"
- "}\n";
- EXPECT_EQ(config_str.size(), write(temp_file_desc.first, config_str.c_str(), config_str.size()))
- << "Write cofnig str to temp file for BuildPipelineByJSONFile test case failed! "
- << strerror(errno);
- Pipeline pipeline("test_pipeline");
- EXPECT_TRUE(pipeline.BuildPipelineByJSONFile(temp_file_desc.second));
-
- config_str[config_str.size() - 2] = ',';
- EXPECT_NE(-1, ftruncate(temp_file_desc.first, 0)) << "Clear temp file content failed. "
- << strerror(errno);
- EXPECT_EQ(config_str.size(), write(temp_file_desc.first, config_str.c_str(), config_str.size()))
- << "Write cofnig str to temp file for BuildPipelineByJSONFile test case failed! "
- << strerror(errno);
- EXPECT_FALSE(pipeline.BuildPipelineByJSONFile(temp_file_desc.second));
-
- config_str = "{\n"
- "\"modulea\" : {\n"
- "\"class_name\" : \"wrong_class_name\",\n"
- "\"parallelism\" : 1,\n"
- "\"max_input_queue_size\" : 20\n"
- "}\n"
- "}\n";
- EXPECT_NE(-1, ftruncate(temp_file_desc.first, 0)) << "Clear temp file content failed. "
- << strerror(errno);
- EXPECT_EQ(config_str.size(), write(temp_file_desc.first, config_str.c_str(), config_str.size()))
- << "Write cofnig str to temp file for BuildPipelineByJSONFile test case failed! "
- << strerror(errno);
- EXPECT_FALSE(pipeline.BuildPipelineByJSONFile(temp_file_desc.second));
-
- close(temp_file_desc.first);
- unlink(temp_file_desc.second.c_str());
- }
- namespace __test_module_open_failed__ {
- class TestModuleOpenFailed : public Module, public ModuleCreator<TestModuleOpenFailed> {
- public:
- explicit TestModuleOpenFailed(const std::string& name) : Module(name) {}
- bool Open(ModuleParamSet params) override {return false;}
- void Close() override {}
- int Process(std::shared_ptr<CNFrameInfo> frame_info) override {return 0;}
- };
- }
- TEST(CorePipeline, Start) {
-
- Pipeline pipeline("test_pipeline");
- CNModuleConfig config1;
- config1.name = "modulea";
- config1.className = "cnstream::TPTestModule";
- config1.parallelism = 1;
- config1.maxInputQueueSize = 20;
- config1.next = {"moduleb"};
- CNModuleConfig config2;
- config2.name = "moduleb";
- config2.className = "cnstream::TPTestModule";
- config2.parallelism = 1;
- config2.maxInputQueueSize = 20;
- CNGraphConfig graph_config;
- graph_config.module_configs = {config1, config2};
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
- EXPECT_TRUE(pipeline.Start());
- EXPECT_FALSE(pipeline.Start());
- EXPECT_TRUE(pipeline.IsRunning());
- pipeline.Stop();
-
- graph_config.module_configs[1].className = "cnstream::__test_module_open_failed__::TestModuleOpenFailed";
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
- EXPECT_FALSE(pipeline.Start());
- EXPECT_FALSE(pipeline.IsRunning());
- }
- TEST(CorePipeline, Stop) {
-
- Pipeline pipeline("test_pipeline");
- CNModuleConfig config1;
- config1.name = "modulea";
- config1.className = "cnstream::TPTestModule";
- config1.parallelism = 1;
- config1.maxInputQueueSize = 20;
- config1.next = {"moduleb"};
- CNModuleConfig config2;
- config2.name = "moduleb";
- config2.className = "cnstream::TPTestModule";
- config2.parallelism = 1;
- config2.maxInputQueueSize = 20;
- CNGraphConfig graph_config;
- graph_config.module_configs = {config1, config2};
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
- EXPECT_TRUE(pipeline.Stop());
-
- EXPECT_TRUE(pipeline.Start());
- EXPECT_TRUE(pipeline.Stop());
- }
- TEST(CorePipeline, IsRunning) {
-
- Pipeline pipeline("test_pipeline");
- EXPECT_FALSE(pipeline.IsRunning());
-
- EXPECT_TRUE(pipeline.Start());
- EXPECT_TRUE(pipeline.IsRunning());
-
- EXPECT_TRUE(pipeline.Stop());
- EXPECT_FALSE(pipeline.IsRunning());
- }
- TEST(CorePipeline, GetModule) {
- Pipeline pipeline("test_pipeline");
- CNModuleConfig config;
- config.name = "modulea";
- config.className = "cnstream::TPTestModule";
- config.parallelism = 1;
- config.maxInputQueueSize = 20;
- CNGraphConfig graph_config;
- graph_config.module_configs = {config};
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
-
- auto module = pipeline.GetModule("modulea");
- EXPECT_NE(nullptr, module);
-
- module = pipeline.GetModule("wrong_module_name");
- EXPECT_EQ(nullptr, module);
- }
- TEST(CorePipeline, GetModuleConfig) {
- Pipeline pipeline("test_pipeline");
- CNModuleConfig config;
- config.name = "modulea";
- config.className = "cnstream::TPTestModule";
- config.parallelism = 1;
- config.maxInputQueueSize = 20;
- CNGraphConfig graph_config;
- graph_config.module_configs = {config};
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
-
- auto module_config = pipeline.GetModuleConfig("modulea");
- EXPECT_EQ(module_config.name, config.name);
-
- module_config = pipeline.GetModuleConfig("wrong_module_name");
- EXPECT_TRUE(module_config.name.empty());
- }
- TEST(CorePipeline, IsProfilingEnabled) {
-
- Pipeline pipeline("test_pipeline");
- ProfilerConfig profiler_config;
- profiler_config.enable_profiling = true;
- EXPECT_TRUE(pipeline.BuildPipeline({}, profiler_config));
- EXPECT_TRUE(pipeline.IsProfilingEnabled());
-
- profiler_config.enable_profiling = false;
- EXPECT_TRUE(pipeline.BuildPipeline({}, profiler_config));
- EXPECT_FALSE(pipeline.IsProfilingEnabled());
- }
- TEST(CorePipeline, IsTracingEnabled) {
-
- Pipeline pipeline("test_pipeline");
- ProfilerConfig profiler_config;
- profiler_config.enable_tracing = true;
- EXPECT_TRUE(pipeline.BuildPipeline({}, profiler_config));
- EXPECT_TRUE(pipeline.IsTracingEnabled());
-
- profiler_config.enable_tracing = false;
- EXPECT_TRUE(pipeline.BuildPipeline({}, profiler_config));
- EXPECT_FALSE(pipeline.IsTracingEnabled());
- }
- TEST(CorePipeline, ProvideData) {
- Pipeline pipeline("test_pipeline");
- CNModuleConfig config1;
- config1.name = "modulea";
- config1.className = "cnstream::TPTestModule";
- config1.parallelism = 1;
- config1.maxInputQueueSize = 20;
- config1.next = {"moduleb"};
- CNModuleConfig config2;
- config2.name = "moduleb";
- config2.className = "cnstream::TPTestModule";
- config2.parallelism = 1;
- config2.maxInputQueueSize = 20;
- CNGraphConfig graph_config;
- graph_config.module_configs = {config1, config2};
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
- auto module = pipeline.GetModule("modulea");
- auto data = CNFrameInfo::Create("1");
-
- EXPECT_FALSE(pipeline.ProvideData(module, data));
- EXPECT_TRUE(pipeline.Start());
-
- EXPECT_FALSE(pipeline.ProvideData(nullptr, data));
-
- TPTestModule orphan("orphan");
- EXPECT_FALSE(pipeline.ProvideData(&orphan, data));
-
- EXPECT_FALSE(pipeline.ProvideData(pipeline.GetModule("moduleb"), data));
-
- EXPECT_TRUE(pipeline.ProvideData(module, data));
- pipeline.Stop();
- }
- TEST(CorePipeline, GetEventBus) {
- Pipeline pipeline("test_pipeline");
- EXPECT_NE(nullptr, pipeline.GetEventBus());
- }
- TEST(CorePipeline, SetStreamMsgObserver) {
- Pipeline pipeline("test_pipeline");
- TPTestStreamMsgObserver observer;
- pipeline.SetStreamMsgObserver(&observer);
- EXPECT_EQ(&observer, pipeline.GetStreamMsgObserver());
- }
- TEST(CorePipeline, GetStreamMsgObserver) {
- Pipeline pipeline("test_pipeline");
- TPTestStreamMsgObserver observer;
- EXPECT_EQ(nullptr, pipeline.GetStreamMsgObserver());
- pipeline.SetStreamMsgObserver(&observer);
- EXPECT_EQ(&observer, pipeline.GetStreamMsgObserver());
- }
- TEST(CorePipeline, GetProfiler) {
- Pipeline pipeline("test_pipeline");
- EXPECT_EQ(nullptr, pipeline.GetProfiler());
- ProfilerConfig profiler_config;
- profiler_config.enable_profiling = false;
- pipeline.BuildPipeline(std::vector<CNModuleConfig>(), profiler_config);
- EXPECT_EQ(nullptr, pipeline.GetProfiler());
- profiler_config.enable_profiling = true;
- pipeline.BuildPipeline(std::vector<CNModuleConfig>(), profiler_config);
- EXPECT_NE(nullptr, pipeline.GetProfiler());
- }
- TEST(CorePipeline, GetTracer) {
- Pipeline pipeline("test_pipeline");
- EXPECT_EQ(nullptr, pipeline.GetTracer());
- ProfilerConfig profiler_config;
- profiler_config.enable_tracing = false;
- pipeline.BuildPipeline(std::vector<CNModuleConfig>(), profiler_config);
- EXPECT_EQ(nullptr, pipeline.GetTracer());
- profiler_config.enable_tracing = true;
- pipeline.BuildPipeline(std::vector<CNModuleConfig>(), profiler_config);
- EXPECT_NE(nullptr, pipeline.GetTracer());
- }
- TEST(CorePipeline, IsRootNode) {
- Pipeline pipeline("test_pipeline");
- CNModuleConfig config1;
- config1.name = "modulea";
- config1.className = "cnstream::TPTestModule";
- config1.parallelism = 1;
- config1.maxInputQueueSize = 20;
- config1.next = {"moduleb"};
- CNModuleConfig config2;
- config2.name = "moduleb";
- config2.className = "cnstream::TPTestModule";
- config2.parallelism = 1;
- config2.maxInputQueueSize = 20;
- CNGraphConfig graph_config;
- graph_config.module_configs = {config1, config2};
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
-
- EXPECT_FALSE(pipeline.IsRootNode("wrong_module_name"));
-
- EXPECT_FALSE(pipeline.IsRootNode("moduleb"));
-
- EXPECT_TRUE(pipeline.IsRootNode("modulea"));
- }
- TEST(CorePipeline, IsLeafNode) {
- Pipeline pipeline("test_pipeline");
- CNModuleConfig config1;
- config1.name = "modulea";
- config1.className = "cnstream::TPTestModule";
- config1.parallelism = 1;
- config1.maxInputQueueSize = 20;
- config1.next = {"moduleb"};
- CNModuleConfig config2;
- config2.name = "moduleb";
- config2.className = "cnstream::TPTestModule";
- config2.parallelism = 1;
- config2.maxInputQueueSize = 20;
- CNGraphConfig graph_config;
- graph_config.module_configs = {config1, config2};
- EXPECT_TRUE(pipeline.BuildPipeline(graph_config));
-
- EXPECT_FALSE(pipeline.IsLeafNode("wrong_module_name"));
-
- EXPECT_FALSE(pipeline.IsLeafNode("modulea"));
-
- EXPECT_TRUE(pipeline.IsLeafNode("moduleb"));
- }
- }
|