123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- import os, sys
- sys.path.append(os.path.split(os.path.realpath(__file__))[0] + "/../lib")
- from cnstream import *
- import time
- import threading
- import cv2
- import math
- import numpy as np
- g_source_lock = threading.Lock()
- g_perf_print_lock = threading.Lock()
- g_perf_print_stop = False
- class CustomObserver(StreamMsgObserver):
- def __init__(self, pipeline, source):
- StreamMsgObserver.__init__(self)
- self.pipeline = pipeline
- self.source = source
- self.stop = False
- self.wakener = threading.Condition()
- self.stream_set = set()
- def update(self, msg):
- global g_source_lock
- g_source_lock.acquire()
- self.wakener.acquire()
- if self.stop:
- return
- if msg.type == StreamMsgType.eos_msg:
- print("pipeline[{}] stream[{}] gets EOS".format(self.pipeline.get_name(), msg.stream_id))
- if msg.stream_id in self.stream_set:
- self.source.remove_source(msg.stream_id)
- self.stream_set.remove(msg.stream_id)
- if len(self.stream_set) == 0:
- print("pipeline[{}] received all EOS".format(self.pipeline.get_name()))
- self.stop = True
- elif msg.type == StreamMsgType.stream_err_msg:
- print("pipeline[{}] stream[{}] gets stream error".format(self.pipeline.get_name(), msg.stream_id))
- if msg.stream_id in self.stream_set:
- self.source.remove_source(msg.stream_id)
- self.stream_set.remove(msg.stream_id)
- if len(self.stream_set) == 0:
- print("pipeline[{}] received all EOS".format(self.pipeline.get_name()))
- self.stop = True
- elif msg.type == StreamMsgType.error_msg:
- print("pipeline[{}] gets error".format(self.pipeline.get_name()))
- self.source.remove_sources()
- self.stream_set.clear()
- self.stop = True
- elif msg.type == StreamMsgType.frame_err_msg:
- print("pipeline[{}] stream[{}] gets frame error".format(self.pipeline.get_name(), msg.stream_id))
- else:
- print("pipeline[{}] unknown message type".format(self.pipeline.get_name()))
- if self.stop:
- self.wakener.notify()
- self.wakener.release()
- g_source_lock.release()
- def wait_for_stop(self):
- self.wakener.acquire()
- if len(self.stream_set) == 0:
- self.stop = True
- self.wakener.release()
- while True:
- if self.wakener.acquire():
- if not self.stop:
- self.wakener.wait()
- else:
- self.pipeline.stop()
- break
- self.wakener.release()
- def increase_stream(self, stream_id):
- self.wakener.acquire()
- if stream_id in self.stream_set:
- print("increase_stream() The stream is ongoing [{}]".format(stream_id))
- else:
- self.stream_set.add(stream_id)
- if self.stop:
- stop = False
- self.wakener.release()
- class PerfThread (threading.Thread):
- def __init__(self, pipeline):
- threading.Thread.__init__(self)
- self.pipeline = pipeline
- def run(self):
- print_performance(self.pipeline)
- def print_performance(pipeline):
- global g_perf_print_lock, g_perf_print_stop
- if pipeline.is_profiling_enabled():
- last_time = time.time()
- while True:
- g_perf_print_lock.acquire()
- if g_perf_print_stop:
- break
- g_perf_print_lock.release()
- elapsed_time = time.time() - last_time
- if elapsed_time < 2:
- time.sleep(2 - elapsed_time)
- last_time = time.time()
- # print whole process performance
- print_pipeline_performance(pipeline)
- # print real time performance (last 2 seconds)
- print_pipeline_performance(pipeline, 2000)
- g_perf_print_lock.release()
- class Yolov3Preproc(Preproc):
- def __init__(self):
- Preproc.__init__(self)
- def init(self, params):
- return True
- def execute(self, input_shapes, frame_info):
- data_frame = frame_info.get_cn_data_frame()
- bgr = data_frame.image_bgr()
- src_w = data_frame.width
- src_h = data_frame.height
- dst_w = input_shapes[0][2]
- dst_h = input_shapes[0][1]
- # resize as letterbox
- scaling_factor = min(dst_w / src_w, dst_h / src_h)
- unpad_w = math.ceil(scaling_factor * src_w)
- unpad_h = math.ceil(scaling_factor * src_h)
- resized = cv2.resize(bgr, (unpad_w, unpad_h))
- # to rgb
- rgb = resized[:,:,::-1]
- # padding
- pad_w = dst_w - unpad_w
- pad_h = dst_h - unpad_h
- pad_l = math.floor(pad_w / 2)
- pad_t = math.floor(pad_h / 2)
- pad_r = pad_w - pad_l
- pad_b = pad_h - pad_t
- dst_img = cv2.copyMakeBorder(rgb, pad_t, pad_b, pad_l, pad_r, cv2.BORDER_CONSTANT, (128, 128, 128))
- # to 0rgb
- argb = cv2.merge([np.zeros((dst_h, dst_w, 1), np.uint8), dst_img])
- # save pad params to frame_info
- collection = frame_info.get_py_collection()
- collection['unpad_h'] = unpad_h
- collection['unpad_w'] = unpad_w
- collection['pad_l'] = pad_l
- collection['pad_t'] = pad_t
- return [np.asarray(argb).flatten().astype(np.float32)]
- def to_range(val : float, min_val, max_val):
- return min(max(val, min_val), max_val)
- class Yolov3Postproc(Postproc):
- def __init__(self):
- Postproc.__init__(self)
- self.__threshold = 0.3
- def init(self, params):
- if 'threshold' in params:
- self.__threshold = float(params['threshold'])
- return True
- def execute(self, net_outputs, input_shapes, frame_info):
- collection = frame_info.get_py_collection()
- unpad_h = collection['unpad_h']
- unpad_w = collection['unpad_w']
- pad_l = collection['pad_l']
- pad_t = collection['pad_t']
- input_h = input_shapes[0][1] # model input height
- input_w = input_shapes[0][2] # model input width
- net_output = net_outputs[0].flatten()
- box_num = int(net_output[0])
- # get bboxes
- for box_id in range(box_num):
- label = str(int(net_output[64 + box_id * 7 + 1]))
- score = net_output[64 + box_id * 7 + 2]
- left = to_range((net_output[64 + box_id * 7 + 3] * input_w - pad_l) / unpad_w, 0, 1)
- top = to_range((net_output[64 + box_id * 7 + 4] * input_h - pad_t) / unpad_h, 0, 1)
- right = to_range((net_output[64 + box_id * 7 + 5] * input_w - pad_l) / unpad_w, 0, 1)
- bottom = to_range((net_output[64 + box_id * 7 + 6] * input_h - pad_t) / unpad_h, 0, 1)
- if left >= right or top >= bottom: continue
- if score < self.__threshold: continue
- # add detection object to frame_info
- detection_object = CNInferObject()
- detection_object.id = label
- detection_object.score = score
- detection_object.bbox.x = left
- detection_object.bbox.y = top
- detection_object.bbox.w = right - left
- detection_object.bbox.h = bottom - top
- frame_info.get_cn_infer_objects().push_back(detection_object)
- def main():
- model_file = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../data/models/yolov3_b4c4_argb_mlu270.cambricon")
- if not os.path.exists(model_file):
- os.makedirs(os.path.dirname(model_file),exist_ok=True)
- import urllib.request
- url_str = "http://video.cambricon.com/models/MLU270/yolov3_b4c4_argb_mlu270.cambricon"
- print(f'Downloading {url_str} ...')
- urllib.request.urlretrieve(url_str, model_file)
- global g_source_lock, g_perf_print_lock, g_perf_print_stop
- # Build a pipeline
- pipeline = Pipeline("yolov3_detection_pipeline")
- pipeline.build_pipeline_by_json_file('yolov3_detection_config.json')
- # Get pipeline's source module
- source_module_name = 'source'
- source = pipeline.get_source_module(source_module_name)
- # Set message observer
- obs = CustomObserver(pipeline, source)
- pipeline.stream_msg_observer = obs
- # Start the pipeline
- if not pipeline.start():
- return
- # Start a thread to print pipeline performance
- perf_th = PerfThread(pipeline)
- perf_th.start()
- # Define an input data handler
- mp4_path = "../../data/videos/cars.mp4"
- stream_num = 1
- for i in range(stream_num):
- stream_id = "stream_id_{}".format(i)
- file_handler = FileHandler(source, stream_id, mp4_path, -1)
- g_source_lock.acquire()
- if source.add_source(file_handler) != 0:
- print("Add source failed stream []".format(stream_id))
- else:
- obs.increase_stream(stream_id)
- g_source_lock.release()
- obs.wait_for_stop()
- if pipeline.is_profiling_enabled():
- g_perf_print_lock.acquire()
- g_perf_print_stop = True
- g_perf_print_lock.release()
- perf_th.join()
- print_pipeline_performance(pipeline)
- print("pipeline[{}] stops".format(pipeline.get_name()))
- if __name__ == '__main__':
- main()
|