123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- import cv2
- import time
- import math
- import os
- import numpy as np
- import tensorflow.compat.v1 as tf
- import nms_locality
- from icdar import restore_rectangle, ground_truth_to_word
- font = cv2.FONT_HERSHEY_SIMPLEX
- test_folder = 'cs/'
- #pb_file_path = '/data2/hejinlong/model_saved/FOTS-master/savedmodel/0902/ocr_model-0902-2201.pb'
- pb_file_path = '/data2/liudan/ocr/savemode/0801/ocr_model-0801-1920.pb'
- def get_images():
- files = []
- exts = ['jpg', 'png', 'jpeg', 'JPG']
- for parent, dirnames, filenames in os.walk(test_folder):
- for filename in filenames:
- for ext in exts:
- if filename.endswith(ext):
- files.append(os.path.join(parent, filename))
- break
- print('Find {} images'.format(len(files)))
- return files
- def detect(score_map, geo_map, timer, score_map_thresh=0.8, box_thresh=0.1, nms_thres=0.2):
- if len(score_map.shape) == 4:
- score_map = score_map[0, :, :, 0]
- geo_map = geo_map[0, :, :, ]
- xy_text = np.argwhere(score_map > score_map_thresh)
- xy_text = xy_text[np.argsort(xy_text[:, 0])]
- start = time.time()
- text_box_restored = restore_rectangle(xy_text[:, ::-1]*4, geo_map[xy_text[:, 0], xy_text[:, 1], :]) # N*4*2
- print('{} text boxes before nms'.format(text_box_restored.shape[0]))
- boxes = np.zeros((text_box_restored.shape[0], 9), dtype=np.float32)
- boxes[:, :8] = text_box_restored.reshape((-1, 8))
- boxes[:, 8] = score_map[xy_text[:, 0], xy_text[:, 1]]
- timer['restore'] = time.time() - start
- start = time.time()
- boxes = nms_locality.nms_locality(boxes.astype('float32'), nms_thres)
- timer['nms'] = time.time() - start
- if boxes.shape[0] == 0:
- return None, timer
- for i, box in enumerate(boxes):
- mask = np.zeros_like(score_map, dtype=np.uint8)
- cv2.fillPoly(mask, box[:8].reshape((-1, 4, 2)).astype(np.int32) // 4, 1)
- boxes[i, 8] = cv2.mean(score_map, mask)[0]
- boxes = boxes[boxes[:, 8] > box_thresh]
- return boxes, timer
- def get_project_matrix_and_width(text_polyses, target_height=8.0):
- project_matrixes = []
- box_widths = []
- filter_box_masks = []
- for i in range(text_polyses.shape[0]):
- x1, y1, x2, y2, x3, y3, x4, y4 = text_polyses[i] / 4
- rotated_rect = cv2.minAreaRect(np.array([[x1, y1], [x2, y2], [x3, y3], [x4, y4]]))
- box_w, box_h = rotated_rect[1][0], rotated_rect[1][1]
- if box_w <= box_h:
- box_w, box_h = box_h, box_w
- mapped_x1, mapped_y1 = (0, 0)
- mapped_x4, mapped_y4 = (0, 8)
- width_box = math.ceil(8 * box_w / box_h)
- width_box = int(min(width_box, 128))
- mapped_x2, mapped_y2 = (width_box, 0)
- src_pts = np.float32([(x1, y1), (x2, y2), (x4, y4)])
- dst_pts = np.float32([(mapped_x1, mapped_y1), (mapped_x2, mapped_y2), (mapped_x4, mapped_y4)])
- affine_matrix = cv2.getAffineTransform(dst_pts.astype(np.float32), src_pts.astype(np.float32))
- affine_matrix = affine_matrix.flatten()
- project_matrixes.append(affine_matrix)
- box_widths.append(width_box)
- project_matrixes = np.array(project_matrixes)
- box_widths = np.array(box_widths)
- return project_matrixes, box_widths
- def sort_poly(p):
- min_axis = np.argmin(np.sum(p, axis=1))
- p = p[[min_axis, (min_axis+1)%4, (min_axis+2)%4, (min_axis+3)%4]]
- if abs(p[0, 0] - p[1, 0]) > abs(p[0, 1] - p[1, 1]):
- return p
- else:
- return p[[0, 3, 2, 1]]
- from shapely.geometry import Polygon
- import copy
- def sort_by_area(boxes):
- A_lst = []
- for box in boxes:
- g = Polygon(box[:8].reshape((4, 2)))
- A_lst.append(g.area)
- B_lst = copy.deepcopy(A_lst)
- A_lst.sort(reverse=True)
- box_lst = []
- idx_lst = []
- for i in A_lst:
- index = B_lst.index(i)
- idx_lst.append(index)
- box_lst.append(boxes[index])
- return idx_lst,np.array(box_lst)
- def sort_box_by_dist(boxes):
- A_lst = []
- boxes_c = boxes.copy()
- boxes_c = boxes_c.reshape((-1,4,2))
- boxes_c_x = np.mean(boxes_c[...,0])
- boxes_c_y = np.mean(boxes_c[...,1])
- for i,j in zip(np.mean(boxes_c[...,0],axis=1),np.mean(boxes_c[...,1],axis=1)):
- g = abs(i-boxes_c_x)+abs(j-boxes_c_y)
- A_lst.append(g)
- B_lst = copy.deepcopy(A_lst)
- A_lst.sort(reverse=False)
- box_lst = []
- idx_lst = []
- for i in A_lst:
- index = B_lst.index(i)
- idx_lst.append(index)
- box_lst.append(boxes[index])
- return idx_lst,np.array(box_lst)
- def test():
- try:
- os.makedirs('outputs/')
- except OSError as e:
- if e.errno != 17:
- raise
- with tf.Session(config=tf.ConfigProto(allow_soft_placement=True)) as sess:
- input_images = tf.placeholder(tf.float32, shape=[None, None, None, 3], name='input_images')
- input_transform_matrix = tf.placeholder(tf.float32, shape=[None, 6], name='input_transform_matrix')
- input_box_mask = []
- input_box_mask.append(tf.placeholder(tf.int32, shape=[None], name='input_box_masks_0'))
- input_box_widths = tf.placeholder(tf.int32, shape=[None], name='input_box_widths')
- with open(pb_file_path, 'rb') as f:
- graph_def = tf.GraphDef()
- graph_def.ParseFromString(f.read())
- output1 = tf.import_graph_def(graph_def,
- input_map={'input_images:0': input_images},
- return_elements=['feature_fusion/Conv_7/Sigmoid:0','feature_fusion/concat_3:0']
- )
- output2 = tf.import_graph_def(
- graph_def,
- input_map={'input_images:0': input_images,\
- 'input_transform_matrix:0':input_transform_matrix,\
- 'input_box_masks_0:0':input_box_mask[0],\
- 'input_box_widths:0':input_box_widths
- },
- return_elements=['SparseToDense:0']
- )
- input_size = 512
- im_fn_list = get_images()
- for im_fn in im_fn_list:
- im = cv2.imread(im_fn)[:, :, ::-1]
- new_h, new_w, _ = im.shape
- h_ratio_hegd,w_ratio_hegd = 1.,1.
- max_h_w_i = np.max([new_h, new_w, input_size])
- # im_padded = np.ones((max_h_w_i, max_h_w_i, 3), dtype=np.uint8)*127
- im_padded = np.zeros((max_h_w_i, max_h_w_i, 3), dtype=np.uint8)
- im_padded[:new_h, :new_w, :] = im.copy()
- if max_h_w_i == input_size:
- im = im_padded.copy()
- # if new_h > new_w:
- # im = cv2.resize(im, (round(new_w*512/new_h),512))
- # new_h_hegd,new_w_hegd,_ = im.shape
- # im_padded = np.zeros((512, 512, 3), dtype=np.uint8)
- # im_padded[:new_h_hegd, :new_w_hegd, :] = im.copy()
- # im = im_padded
- # h_ratio_hegd,w_ratio_hegd = 512/new_h,512/new_h
- # else:
- # im = cv2.resize(im, (512,round(new_h*512/new_w)))
- # new_h_hegd,new_w_hegd,_ = im.shape
- # im_padded = np.zeros((512, 512, 3), dtype=np.uint8)
- # im_padded[:new_h_hegd, :new_w_hegd, :] = im.copy()
- # im = im_padded
- # h_ratio_hegd,w_ratio_hegd = input_size/new_w,input_size/new_w
- else:
- im = cv2.resize(im_padded, dsize=(input_size, input_size))
- h_ratio_hegd,w_ratio_hegd = input_size/max_h_w_i,input_size/max_h_w_i
- start_time = time.time()
- timer = {'net': 0, 'restore': 0, 'nms': 0}
- start = time.time()
- score, geometry = sess.run(output1, feed_dict={input_images: [im]})
- boxes, timer = detect(score_map=score, geo_map=geometry, timer=timer)
- res_file = 'outputs/001.txt'
- if boxes is not None and boxes.shape[0] != 0:
- input_roi_boxes = boxes[:, :8].reshape(-1, 8)
- boxes_masks = [int(0)] * input_roi_boxes.shape[0]
- transform_matrixes, box_widths = get_project_matrix_and_width(input_roi_boxes)
- try:
- recog_decode = sess.run(output2, feed_dict={input_images: [im], \
- input_transform_matrix: transform_matrixes,\
- input_box_mask[0]: boxes_masks,\
- input_box_widths: box_widths})[0]
- except:
- with open(res_file, 'w') as f:
- f.write('')
- continue
- timer['net'] = time.time() - start
- boxes = boxes[:, :8].reshape((-1, 4, 2))
- if recog_decode.shape[0] != boxes.shape[0]:
- print("detection and recognition result are not equal!")
- exit(-1)
-
- # idx_lst,boxes = sort_box_by_dist(boxes)
- # recog_decode_lst = []
- # for i in idx_lst:
- # recog_decode_lst.append(recog_decode[i])
- # recog_decode = recog_decode_lst
- # if im_fn.split('/')[-2] == '刷新表':
- # boxes = boxes[:2,...]
- # recog_decode = recog_decode[:2]
- # if im_fn.split('/')[-2] in ['绿色表' ,'黑色表']:
- # boxes = boxes[:4,...]
- # recog_decode = recog_decode[:4]
- # if len(boxes) < 4:
- # boxes = []
-
- with open(res_file, 'w') as f:
- text_tags = []
- boxes_hegd = []
- for i, box in enumerate(boxes):
- box = sort_poly(box.astype(np.int32))
- box[...,0],box[...,1] = box[...,0]/h_ratio_hegd,box[...,1]/w_ratio_hegd
- if np.linalg.norm(box[0] - box[1]) < 5 or np.linalg.norm(box[3]-box[0]) < 5:
- continue
- if np.any(box[...,0]>new_w*1.1) or np.any(box[...,1]>new_h*1.1) or np.any(box[...,0]<-new_w*0.1) or np.any(box[...,1]<-new_h*0.1):
- continue
- recognition_result = ground_truth_to_word(recog_decode[i])
- for ii in range(box.shape[0]):
- for jj in range(box.shape[1]):
- box[ii,jj] = round(box[ii,jj])
- box = box.astype(np.int32)
- text_tags.append(recognition_result)
- boxes_hegd.append(box)
- f.write('{},{},{},{},{},{},{},{},{}\r\n'.format(
- box[0, 0], box[0, 1], box[1, 0], box[1, 1],\
- box[2, 0], box[2, 1], box[3, 0], box[3, 1],\
- recognition_result
- ))
- from box_a_pic import box_pic
- boxes_hegd = np.array(boxes_hegd)
- if len(boxes_hegd) == 0:
- im_txt1 = cv2.imread(im_fn)
- else:
- im_txt1,_,_ = box_pic(boxes_hegd,text_tags,im_fn)
- im_fn = im_fn.split(test_folder)[-1] #[1:]
- img_path = os.path.join('outputs', im_fn)
- print(img_path)
- dir_path = img_path.split(os.path.basename(img_path))[0]
- if not os.path.exists(dir_path):
- os.makedirs(dir_path)
- cv2.imwrite(img_path, im_txt1)
- else:
- timer['net'] = time.time() - start
- f = open(res_file, "w")
- f.close()
- print('{} : net {:.0f}ms, restore {:.0f}ms, nms {:.0f}ms'.format(
- im_fn, timer['net']*1000, timer['restore']*1000, timer['nms']*1000))
- duration = time.time() - start_time
- print('[timing] {}'.format(duration))
- if __name__ == '__main__':
- test()
|