pipeline_test.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import os, sys
  2. sys.path.append(os.path.split(os.path.realpath(__file__))[0] + "/../lib")
  3. from cnstream import *
  4. # for test stream msg observer keep alive with pipeline
  5. smo_del_called = False
  6. class CustomSMO(StreamMsgObserver):
  7. def __init__(self):
  8. StreamMsgObserver.__init__(self)
  9. def __del__(self):
  10. global smo_del_called
  11. smo_del_called = True
  12. def update(self, msg):
  13. pass
  14. def CreateConfigWithSourceModule():
  15. config = CNGraphConfig()
  16. config.name = "test_pipeline"
  17. config.profiler_config.enable_profiling = True
  18. mconfig = CNModuleConfig()
  19. mconfig.name = "test_module"
  20. mconfig.class_name = "cnstream::DataSource"
  21. config.module_configs = [mconfig]
  22. return config
  23. class TestPipeline:
  24. def test_name(self):
  25. pipeline = Pipeline("test_pipeline")
  26. assert "test_pipeline" == pipeline.get_name()
  27. def test_build_pipeline(self):
  28. # TODO: wait for config py api merge
  29. pipeline = Pipeline("test_pipeline")
  30. config = CreateConfigWithSourceModule()
  31. assert pipeline.build_pipeline(config)
  32. source_module = pipeline.get_source_module("test_module")
  33. def test_start(self):
  34. pipeline = Pipeline("test_pipeline")
  35. assert pipeline.start()
  36. assert pipeline.is_running()
  37. def test_get_source_module(self):
  38. pipeline = Pipeline("test_pipeline")
  39. config = CreateConfigWithSourceModule()
  40. assert pipeline.build_pipeline(config)
  41. assert None != pipeline.get_source_module("test_module")
  42. assert None == pipeline.get_source_module("test_none")
  43. def test_get_module_config(self):
  44. pipeline = Pipeline("test_pipeline")
  45. config = CreateConfigWithSourceModule()
  46. assert pipeline.build_pipeline(config)
  47. mconfig = pipeline.get_module_config("test_module")
  48. assert "cnstream::DataSource" == mconfig.class_name
  49. def test_is_profiling_enabled(self):
  50. pipeline = Pipeline("test_pipeline")
  51. config = CNGraphConfig()
  52. config.profiler_config.enable_profiling = True
  53. assert pipeline.build_pipeline(config)
  54. assert pipeline.is_profiling_enabled()
  55. def test_is_tracing_enabled(self):
  56. pipeline = Pipeline("test_pipeline")
  57. config = CNGraphConfig()
  58. config.profiler_config.enable_tracing = True
  59. assert pipeline.build_pipeline(config)
  60. assert pipeline.is_tracing_enabled()
  61. def test_provide_data(self):
  62. pipeline = Pipeline("test_pipeline")
  63. config = CreateConfigWithSourceModule()
  64. assert pipeline.build_pipeline(config)
  65. assert pipeline.start()
  66. source_module = pipeline.get_source_module("test_module")
  67. assert pipeline.provide_data(source_module, CNFrameInfo("test_stream"))
  68. pipeline.stop()
  69. def test_stream_msg_observer(self):
  70. smo = CustomSMO()
  71. pipeline = Pipeline("test_pipeline")
  72. pipeline.stream_msg_observer = smo
  73. # test smo keep alive with pipeline
  74. smo = None
  75. global smo_del_called
  76. assert not smo_del_called
  77. # TODO: test stream message update
  78. def test_is_root_node(self):
  79. pipeline = Pipeline("test_pipeline")
  80. config = CreateConfigWithSourceModule()
  81. assert pipeline.build_pipeline(config)
  82. assert pipeline.is_root_node("test_module")
  83. assert not pipeline.is_root_node("test_not")
  84. def test_register_frame_done_cb(self):
  85. # TODO: wait for data handler api
  86. pass