123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import os, sys
- sys.path.append(os.path.split(os.path.realpath(__file__))[0] + "/../lib")
- from cnstream import *
- # for test stream msg observer keep alive with pipeline
- smo_del_called = False
- class CustomSMO(StreamMsgObserver):
- def __init__(self):
- StreamMsgObserver.__init__(self)
- def __del__(self):
- global smo_del_called
- smo_del_called = True
- def update(self, msg):
- pass
- def CreateConfigWithSourceModule():
- config = CNGraphConfig()
- config.name = "test_pipeline"
- config.profiler_config.enable_profiling = True
- mconfig = CNModuleConfig()
- mconfig.name = "test_module"
- mconfig.class_name = "cnstream::DataSource"
- config.module_configs = [mconfig]
- return config
- class TestPipeline:
- def test_name(self):
- pipeline = Pipeline("test_pipeline")
- assert "test_pipeline" == pipeline.get_name()
- def test_build_pipeline(self):
- # TODO: wait for config py api merge
- pipeline = Pipeline("test_pipeline")
- config = CreateConfigWithSourceModule()
- assert pipeline.build_pipeline(config)
- source_module = pipeline.get_source_module("test_module")
- def test_start(self):
- pipeline = Pipeline("test_pipeline")
- assert pipeline.start()
- assert pipeline.is_running()
- def test_get_source_module(self):
- pipeline = Pipeline("test_pipeline")
- config = CreateConfigWithSourceModule()
- assert pipeline.build_pipeline(config)
- assert None != pipeline.get_source_module("test_module")
- assert None == pipeline.get_source_module("test_none")
-
- def test_get_module_config(self):
- pipeline = Pipeline("test_pipeline")
- config = CreateConfigWithSourceModule()
- assert pipeline.build_pipeline(config)
- mconfig = pipeline.get_module_config("test_module")
- assert "cnstream::DataSource" == mconfig.class_name
- def test_is_profiling_enabled(self):
- pipeline = Pipeline("test_pipeline")
- config = CNGraphConfig()
- config.profiler_config.enable_profiling = True
- assert pipeline.build_pipeline(config)
- assert pipeline.is_profiling_enabled()
- def test_is_tracing_enabled(self):
- pipeline = Pipeline("test_pipeline")
- config = CNGraphConfig()
- config.profiler_config.enable_tracing = True
- assert pipeline.build_pipeline(config)
- assert pipeline.is_tracing_enabled()
- def test_provide_data(self):
- pipeline = Pipeline("test_pipeline")
- config = CreateConfigWithSourceModule()
- assert pipeline.build_pipeline(config)
- assert pipeline.start()
- source_module = pipeline.get_source_module("test_module")
- assert pipeline.provide_data(source_module, CNFrameInfo("test_stream"))
- pipeline.stop()
- def test_stream_msg_observer(self):
- smo = CustomSMO()
- pipeline = Pipeline("test_pipeline")
- pipeline.stream_msg_observer = smo
- # test smo keep alive with pipeline
- smo = None
- global smo_del_called
- assert not smo_del_called
- # TODO: test stream message update
- def test_is_root_node(self):
- pipeline = Pipeline("test_pipeline")
- config = CreateConfigWithSourceModule()
- assert pipeline.build_pipeline(config)
- assert pipeline.is_root_node("test_module")
- assert not pipeline.is_root_node("test_not")
- def test_register_frame_done_cb(self):
- # TODO: wait for data handler api
- pass
|