pycnstream_demo.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. import os, sys
  2. sys.path.append(os.path.split(os.path.realpath(__file__))[0] + "/../lib")
  3. from cnstream import *
  4. import time
  5. import threading
  6. import cv2
  7. g_source_lock = threading.Lock()
  8. g_perf_print_lock = threading.Lock()
  9. g_perf_print_stop = False
  10. cur_file_dir = os.path.split(os.path.realpath(__file__))[0]
  11. class CustomObserver(StreamMsgObserver):
  12. def __init__(self, pipeline, source):
  13. StreamMsgObserver.__init__(self)
  14. self.pipeline = pipeline
  15. self.source = source
  16. self.stop = False
  17. self.wakener = threading.Condition()
  18. self.stream_set = set()
  19. def update(self, msg):
  20. global g_source_lock
  21. g_source_lock.acquire()
  22. self.wakener.acquire()
  23. if self.stop:
  24. return
  25. if msg.type == StreamMsgType.eos_msg:
  26. print("pipeline[{}] stream[{}] gets EOS".format(self.pipeline.get_name(), msg.stream_id))
  27. if msg.stream_id in self.stream_set:
  28. self.source.remove_source(msg.stream_id)
  29. self.stream_set.remove(msg.stream_id)
  30. if len(self.stream_set) == 0:
  31. print("pipeline[{}] received all EOS".format(self.pipeline.get_name()))
  32. self.stop = True
  33. elif msg.type == StreamMsgType.stream_err_msg:
  34. print("pipeline[{}] stream[{}] gets stream error".format(self.pipeline.get_name(), msg.stream_id))
  35. if msg.stream_id in self.stream_set:
  36. self.source.remove_source(msg.stream_id, True)
  37. self.stream_set.remove(msg.stream_id)
  38. if len(self.stream_set) == 0:
  39. print("pipeline[{}] received all EOS".format(self.pipeline.get_name()))
  40. self.stop = True
  41. elif msg.type == StreamMsgType.error_msg:
  42. print("pipeline[{}] gets error".format(self.pipeline.get_name()))
  43. self.source.remove_sources(True)
  44. self.stream_set.clear()
  45. self.stop = True
  46. elif msg.type == StreamMsgType.frame_err_msg:
  47. print("pipeline[{}] stream[{}] gets frame error".format(self.pipeline.get_name(), msg.stream_id))
  48. else:
  49. print("pipeline[{}] unknown message type".format(self.pipeline.get_name()))
  50. if self.stop:
  51. self.wakener.notify()
  52. self.wakener.release()
  53. g_source_lock.release()
  54. def wait_for_stop(self):
  55. self.wakener.acquire()
  56. if len(self.stream_set) == 0:
  57. self.stop = True
  58. self.wakener.release()
  59. while True:
  60. if self.wakener.acquire():
  61. if not self.stop:
  62. self.wakener.wait()
  63. else:
  64. self.pipeline.stop()
  65. break
  66. self.wakener.release()
  67. def increase_stream(self, stream_id):
  68. self.wakener.acquire()
  69. if stream_id in self.stream_set:
  70. print("increase_stream() The stream is ongoing [{}]".format(stream_id))
  71. else:
  72. self.stream_set.add(stream_id)
  73. if self.stop:
  74. stop = False
  75. self.wakener.release()
  76. class PerfThread (threading.Thread):
  77. def __init__(self, pipeline):
  78. threading.Thread.__init__(self)
  79. self.pipeline = pipeline
  80. def run(self):
  81. print_performance(self.pipeline)
  82. def print_performance(pipeline):
  83. global g_perf_print_lock, g_perf_print_stop
  84. if pipeline.is_profiling_enabled():
  85. last_time = time.time()
  86. while True:
  87. g_perf_print_lock.acquire()
  88. if g_perf_print_stop:
  89. break
  90. g_perf_print_lock.release()
  91. elapsed_time = time.time() - last_time
  92. if elapsed_time < 2:
  93. time.sleep(2 - elapsed_time)
  94. last_time = time.time()
  95. # print whole process performance
  96. print_pipeline_performance(pipeline)
  97. # print real time performance (last 2 seconds)
  98. print_pipeline_performance(pipeline, 2000)
  99. g_perf_print_lock.release()
  100. class OneModuleObserver(ModuleObserver):
  101. def __init__(self) -> None:
  102. super().__init__()
  103. def notify(self, frame: 'probe data from one node'):
  104. cn_data = frame.get_cn_data_frame()
  105. frame_id = cn_data.frame_id
  106. stream_id = frame.stream_id
  107. print("receive the frame {} from {}".format(frame_id, stream_id))
  108. def receive_processed_frame(frame):
  109. cn_data = frame.get_cn_data_frame()
  110. frame_id = cn_data.frame_id
  111. stream_id = frame.stream_id
  112. print("receive the frame {} from {}".format(frame_id, stream_id))
  113. if cn_data.has_bgr_image():
  114. cv2.imwrite('{}/output/{}_frame_{}.jpg'.format(cur_file_dir, stream_id, frame_id), cn_data.image_bgr())
  115. objs = frame.get_cn_infer_objects().objs
  116. print("objects number: ", len(objs))
  117. for obj in objs:
  118. print("obj: id: {} score: {:.4f} bbox: {:.4f}, {:.4f}, {:.4f}, {:.4f}".format(
  119. obj.id, obj.score, obj.bbox.x, obj.bbox.y, obj.bbox.w, obj.bbox.h))
  120. def main():
  121. if not os.path.exists(cur_file_dir + "/output"):
  122. os.mkdir(cur_file_dir + "/output")
  123. model_file = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../data/models/yolov3_b4c4_argb_mlu270.cambricon")
  124. if not os.path.exists(model_file):
  125. os.makedirs(os.path.dirname(model_file),exist_ok=True)
  126. import urllib.request
  127. url_str = "http://video.cambricon.com/models/MLU270/yolov3_b4c4_argb_mlu270.cambricon"
  128. print('Downloading {} ...'.format(url_str))
  129. urllib.request.urlretrieve(url_str, model_file)
  130. global g_source_lock, g_perf_print_lock, g_perf_print_stop
  131. # Build a pipeline
  132. pipeline = Pipeline("my_pipeline")
  133. pipeline.build_pipeline_by_json_file('python_demo_config.json')
  134. # Set frame done callback
  135. pipeline.register_frame_done_callback(receive_processed_frame)
  136. # Probe one module's output in the pipeline, it's just for debugging
  137. # infer_module_observer = OneModuleObserver()
  138. # infer = pipeline.get_module('detector')
  139. # infer.set_module_observer(infer_module_observer)
  140. # Get pipeline's source module
  141. source_module_name = 'source'
  142. source = pipeline.get_source_module(source_module_name)
  143. # Set message observer
  144. obs = CustomObserver(pipeline, source)
  145. pipeline.stream_msg_observer = obs
  146. # Start the pipeline
  147. if not pipeline.start():
  148. print("Start pipeline failed.")
  149. return
  150. # Start a thread to print pipeline performance
  151. perf_th = PerfThread(pipeline)
  152. perf_th.start()
  153. # Define an input data handler
  154. mp4_path = "../../data/videos/cars.mp4"
  155. stream_num = 4
  156. for i in range(stream_num):
  157. stream_id = "stream_{}".format(i)
  158. file_handler = FileHandler(source, stream_id, mp4_path, -1)
  159. g_source_lock.acquire()
  160. if source.add_source(file_handler) != 0:
  161. print("Add source failed stream []".format(stream_id))
  162. else:
  163. obs.increase_stream(stream_id)
  164. g_source_lock.release()
  165. obs.wait_for_stop()
  166. if pipeline.is_profiling_enabled():
  167. g_perf_print_lock.acquire()
  168. g_perf_print_stop = True
  169. g_perf_print_lock.release()
  170. perf_th.join()
  171. print_pipeline_performance(pipeline)
  172. print("pipeline[{}] stops".format(pipeline.get_name()))
  173. if __name__ == '__main__':
  174. main()